1
0
forked from ScoDoc/ScoDoc
ScoDoc/app/api/justificatifs.py

808 lines
25 KiB
Python
Raw Permalink Normal View History

##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""ScoDoc 9 API : Assiduités
"""
from datetime import datetime
from flask_json import as_json
from flask import g, request
from flask_login import login_required, current_user
from flask_sqlalchemy.query import Query
import app.scodoc.sco_assiduites as scass
import app.scodoc.sco_utils as scu
from app import db
from app.api import api_bp as bp
from app.api import api_web_bp
2023-07-27 14:58:57 +02:00
from app.api import get_model_api_object, tools
from app.decorators import permission_required, scodoc
from app.models import (
Identite,
Justificatif,
Departement,
FormSemestre,
FormSemestreInscription,
)
from app.models.assiduites import (
compute_assiduites_justified,
)
from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_utils import json_error
from app.scodoc.sco_groups import get_group_members
# Partie Modèle
@bp.route("/justificatif/<int:justif_id>")
@api_web_bp.route("/justificatif/<int:justif_id>")
@scodoc
@permission_required(Permission.ScoView)
def justificatif(justif_id: int = None):
"""Retourne un objet justificatif à partir de son id
Exemple de résultat:
{
"justif_id": 1,
"etudid": 2,
"date_debut": "2022-10-31T08:00+01:00",
"date_fin": "2022-10-31T10:00+01:00",
"etat": "valide",
"fichier": "archive_id",
"raison": "une raison",
"entry_date": "2022-10-31T08:00+01:00",
"user_id": 1 or null,
}
"""
return get_model_api_object(Justificatif, justif_id, Identite)
2023-07-27 14:58:57 +02:00
# etudid
@bp.route("/justificatifs/<int:etudid>", defaults={"with_query": False})
@api_web_bp.route("/justificatifs/<int:etudid>", defaults={"with_query": False})
@bp.route("/justificatifs/<int:etudid>/query", defaults={"with_query": True})
@api_web_bp.route("/justificatifs/<int:etudid>/query", defaults={"with_query": True})
@bp.route("/justificatifs/etudid/<int:etudid>", defaults={"with_query": False})
@api_web_bp.route("/justificatifs/etudid/<int:etudid>", defaults={"with_query": False})
@bp.route("/justificatifs/etudid/<int:etudid>/query", defaults={"with_query": True})
@api_web_bp.route(
"/justificatifs/etudid/<int:etudid>/query", defaults={"with_query": True}
)
2023-07-27 14:58:57 +02:00
# nip
@bp.route("/justificatifs/nip/<nip>", defaults={"with_query": False})
@api_web_bp.route("/justificatifs/nip/<nip>", defaults={"with_query": False})
@bp.route("/justificatifs/nip/<nip>/query", defaults={"with_query": True})
@api_web_bp.route("/justificatifs/nip/<nip>/query", defaults={"with_query": True})
# ine
@bp.route("/justificatifs/ine/<ine>", defaults={"with_query": False})
@api_web_bp.route("/justificatifs/ine/<ine>", defaults={"with_query": False})
@bp.route("/justificatifs/ine/<ine>/query", defaults={"with_query": True})
@api_web_bp.route("/justificatifs/ine/<ine>/query", defaults={"with_query": True})
#
@login_required
@scodoc
@as_json
@permission_required(Permission.ScoView)
2023-07-27 14:58:57 +02:00
def justificatifs(etudid: int = None, nip=None, ine=None, with_query: bool = False):
"""
Retourne toutes les assiduités d'un étudiant
chemin : /justificatifs/<int:etudid>
Un filtrage peut être donné avec une query
chemin : /justificatifs/<int:etudid>/query?
Les différents filtres :
Etat (etat du justificatif -> validé, non validé, modifé, en attente):
query?etat=[- liste des états séparé par une virgule -]
ex: .../query?etat=validé,modifié
Date debut
(date de début du justificatif, sont affichés les justificatifs
dont la date de début est supérieur ou égale à la valeur donnée):
query?date_debut=[- date au format iso -]
ex: query?date_debut=2022-11-03T08:00+01:00
Date fin
(date de fin du justificatif, sont affichés les justificatifs
dont la date de fin est inférieure ou égale à la valeur donnée):
query?date_fin=[- date au format iso -]
ex: query?date_fin=2022-11-03T10:00+01:00
user_id (l'id de l'auteur du justificatif)
query?user_id=[int]
ex query?user_id=3
"""
2023-07-27 14:58:57 +02:00
etud: Identite = tools.get_etud(etudid, nip, ine)
2023-07-27 14:58:57 +02:00
if etud is None:
return json_error(
404,
message="étudiant inconnu",
)
justificatifs_query = etud.justificatifs
if with_query:
justificatifs_query = _filter_manager(request, justificatifs_query)
data_set: list[dict] = []
for just in justificatifs_query.all():
data = just.to_dict(format_api=True)
data_set.append(data)
return data_set
2023-06-28 17:15:24 +02:00
@api_web_bp.route("/justificatifs/dept/<int:dept_id>", defaults={"with_query": False})
@api_web_bp.route(
"/justificatifs/dept/<int:dept_id>/query", defaults={"with_query": True}
)
@bp.route("/justificatifs/dept/<int:dept_id>", defaults={"with_query": False})
@bp.route("/justificatifs/dept/<int:dept_id>/query", defaults={"with_query": True})
2023-06-28 17:15:24 +02:00
@login_required
@scodoc
@as_json
@permission_required(Permission.ScoView)
2023-06-30 15:34:50 +02:00
def justificatifs_dept(dept_id: int = None, with_query: bool = False):
2023-06-28 17:15:24 +02:00
""" """
dept = Departement.query.get_or_404(dept_id)
etuds = [etud.id for etud in dept.etudiants]
justificatifs_query = Justificatif.query.filter(Justificatif.etudid.in_(etuds))
if with_query:
justificatifs_query = _filter_manager(request, justificatifs_query)
2023-06-28 17:15:24 +02:00
data_set: list[dict] = []
for just in justificatifs_query:
data_set.append(_set_sems_and_groupe(just))
2023-06-28 17:15:24 +02:00
return data_set
def _set_sems_and_groupe(justi: Justificatif) -> dict:
from app.scodoc.sco_groups import get_etud_groups
data = justi.to_dict(format_api=True)
formsemestre: FormSemestre = (
FormSemestre.query.join(
FormSemestreInscription,
FormSemestre.id == FormSemestreInscription.formsemestre_id,
)
.filter(
justi.date_debut <= FormSemestre.date_fin,
justi.date_fin >= FormSemestre.date_debut,
FormSemestreInscription.etudid == justi.etudid,
)
.first()
)
if formsemestre:
data["formsemestre"] = {
"id": formsemestre.id,
"title": formsemestre.session_id(),
}
return data
@bp.route(
"/justificatifs/formsemestre/<int:formsemestre_id>", defaults={"with_query": False}
)
@api_web_bp.route(
"/justificatifs/formsemestre/<int:formsemestre_id>", defaults={"with_query": False}
)
@bp.route(
"/justificatifs/formsemestre/<int:formsemestre_id>/query",
defaults={"with_query": True},
)
@api_web_bp.route(
"/justificatifs/formsemestre/<int:formsemestre_id>/query",
defaults={"with_query": True},
)
@login_required
@scodoc
@as_json
@permission_required(Permission.ScoView)
def justificatifs_formsemestre(formsemestre_id: int, with_query: bool = False):
"""Retourne tous les justificatifs du formsemestre"""
formsemestre: FormSemestre = None
formsemestre_id = int(formsemestre_id)
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
if formsemestre is None:
return json_error(404, "le paramètre 'formsemestre_id' n'existe pas")
justificatifs_query = scass.filter_by_formsemestre(
Justificatif.query, Justificatif, formsemestre
)
if with_query:
justificatifs_query = _filter_manager(request, justificatifs_query)
data_set: list[dict] = []
for justi in justificatifs_query.all():
data = justi.to_dict(format_api=True)
data_set.append(data)
return data_set
@bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
@api_web_bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
@bp.route("/justificatif/etudid/<int:etudid>/create", methods=["POST"])
@api_web_bp.route("/justificatif/etudid/<int:etudid>/create", methods=["POST"])
# nip
@bp.route("/justificatif/nip/<nip>/create", methods=["POST"])
@api_web_bp.route("/justificatif/nip/<nip>/create", methods=["POST"])
# ine
@bp.route("/justificatif/ine/<ine>/create", methods=["POST"])
@api_web_bp.route("/justificatif/ine/<ine>/create", methods=["POST"])
@scodoc
@login_required
@as_json
@permission_required(Permission.AbsChange)
def justif_create(etudid: int = None, nip=None, ine=None):
"""
Création d'un justificatif pour l'étudiant (etudid)
La requête doit avoir un content type "application/json":
[
{
"date_debut": str,
"date_fin": str,
"etat": str,
},
{
"date_debut": str,
"date_fin": str,
"etat": str,
"raison":str,
}
...
]
"""
2023-07-27 14:58:57 +02:00
etud: Identite = tools.get_etud(etudid, nip, ine)
if etud is None:
return json_error(
404,
message="étudiant inconnu",
)
create_list: list[object] = request.get_json(force=True)
if not isinstance(create_list, list):
return json_error(404, "Le contenu envoyé n'est pas une liste")
errors: list = []
success: list = []
justifs: list = []
for i, data in enumerate(create_list):
code, obj, justi = _create_one(data, etud)
if code == 404:
errors.append({"indice": i, "message": obj})
else:
success.append({"indice": i, "message": obj})
justifs.append(justi)
2023-06-30 15:34:50 +02:00
scass.simple_invalidate_cache(data, etud.id)
compute_assiduites_justified(etud.etudid, justifs)
return {"errors": errors, "success": success}
def _create_one(
data: dict,
etud: Identite,
) -> tuple[int, object]:
errors: list[str] = []
# -- vérifications de l'objet json --
# cas 1 : ETAT
etat = data.get("etat", None)
if etat is None:
errors.append("param 'etat': manquant")
elif not scu.EtatJustificatif.contains(etat):
errors.append("param 'etat': invalide")
etat = scu.EtatJustificatif.get(etat)
# cas 2 : date_debut
date_debut = data.get("date_debut", None)
if date_debut is None:
errors.append("param 'date_debut': manquant")
deb = scu.is_iso_formated(date_debut, convert=True)
if deb is None:
errors.append("param 'date_debut': format invalide")
# cas 3 : date_fin
date_fin = data.get("date_fin", None)
if date_fin is None:
errors.append("param 'date_fin': manquant")
fin = scu.is_iso_formated(date_fin, convert=True)
if fin is None:
errors.append("param 'date_fin': format invalide")
# cas 4 : raison
raison: str = data.get("raison", None)
external_data = data.get("external_data")
if external_data is not None:
if not isinstance(external_data, dict):
errors.append("param 'external_data' : n'est pas un objet JSON")
if errors:
err: str = ", ".join(errors)
return (404, err, None)
# TOUT EST OK
try:
nouv_justificatif: Query = Justificatif.create_justificatif(
date_debut=deb,
date_fin=fin,
etat=etat,
etud=etud,
raison=raison,
user_id=current_user.id,
external_data=external_data,
)
db.session.add(nouv_justificatif)
db.session.commit()
return (
200,
{
"justif_id": nouv_justificatif.id,
"couverture": scass.justifies(nouv_justificatif),
},
nouv_justificatif,
)
except ScoValueError as excp:
return (
404,
excp.args[0],
)
@bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"])
@login_required
@scodoc
@as_json
@permission_required(Permission.AbsChange)
def justif_edit(justif_id: int):
"""
Edition d'un justificatif à partir de son id
La requête doit avoir un content type "application/json":
{
"etat"?: str,
"raison"?: str
"date_debut"?: str
"date_fin"?: str
}
"""
justificatif_unique: Query = Justificatif.query.filter_by(
id=justif_id
).first_or_404()
errors: list[str] = []
data = request.get_json(force=True)
avant_ids: list[int] = scass.justifies(justificatif_unique)
# Vérifications de data
# Cas 1 : Etat
if data.get("etat") is not None:
etat = scu.EtatJustificatif.get(data.get("etat"))
if etat is None:
errors.append("param 'etat': invalide")
else:
justificatif_unique.etat = etat
# Cas 2 : raison
raison = data.get("raison", False)
if raison is not False:
justificatif_unique.raison = raison
deb, fin = None, None
# cas 3 : date_debut
date_debut = data.get("date_debut", False)
if date_debut is not False:
if date_debut is None:
errors.append("param 'date_debut': manquant")
deb = scu.is_iso_formated(date_debut.replace(" ", "+"), convert=True)
if deb is None:
errors.append("param 'date_debut': format invalide")
# cas 4 : date_fin
date_fin = data.get("date_fin", False)
if date_fin is not False:
if date_fin is None:
errors.append("param 'date_fin': manquant")
fin = scu.is_iso_formated(date_fin.replace(" ", "+"), convert=True)
if fin is None:
errors.append("param 'date_fin': format invalide")
# Mise à jour des dates
deb = deb if deb is not None else justificatif_unique.date_debut
fin = fin if fin is not None else justificatif_unique.date_fin
external_data = data.get("external_data")
if external_data is not None:
if not isinstance(external_data, dict):
errors.append("param 'external_data' : n'est pas un objet JSON")
else:
justificatif_unique.external_data = external_data
2023-06-23 16:12:36 +02:00
if fin <= deb:
errors.append("param 'dates' : Date de début après date de fin")
justificatif_unique.date_debut = deb
justificatif_unique.date_fin = fin
if errors:
err: str = ", ".join(errors)
return json_error(404, err)
db.session.add(justificatif_unique)
db.session.commit()
2023-06-30 15:34:50 +02:00
retour = {
"couverture": {
"avant": avant_ids,
"après": compute_assiduites_justified(
justificatif_unique.etudid,
[justificatif_unique],
2023-09-24 09:18:13 +02:00
True,
),
}
}
2023-06-30 15:34:50 +02:00
scass.simple_invalidate_cache(justificatif_unique.to_dict())
return retour
@bp.route("/justificatif/delete", methods=["POST"])
@api_web_bp.route("/justificatif/delete", methods=["POST"])
@login_required
@scodoc
@as_json
@permission_required(Permission.AbsChange)
def justif_delete():
"""
Suppression d'un justificatif à partir de son id
Forme des données envoyées :
[
<justif_id:int>,
...
]
"""
justificatifs_list: list[int] = request.get_json(force=True)
if not isinstance(justificatifs_list, list):
return json_error(404, "Le contenu envoyé n'est pas une liste")
output = {"errors": [], "success": []}
for i, ass in enumerate(justificatifs_list):
code, msg = _delete_singular(ass, db)
if code == 404:
output["errors"].append({"indice": i, "message": msg})
else:
output["success"].append({"indice": i, "message": "OK"})
2023-06-30 15:34:50 +02:00
db.session.commit()
return output
def _delete_singular(justif_id: int, database):
justificatif_unique: Query = Justificatif.query.filter_by(id=justif_id).first()
if justificatif_unique is None:
return (404, "Justificatif non existant")
archive_name: str = justificatif_unique.fichier
if archive_name is not None:
archiver: JustificatifArchiver = JustificatifArchiver()
try:
archiver.delete_justificatif(justificatif_unique.etudiant, archive_name)
except ValueError:
pass
2023-06-30 15:34:50 +02:00
scass.simple_invalidate_cache(justificatif_unique.to_dict())
database.session.delete(justificatif_unique)
compute_assiduites_justified(
justificatif_unique.etudid,
Justificatif.query.filter_by(etudid=justificatif_unique.etudid).all(),
True,
)
2023-06-30 15:34:50 +02:00
return (200, "OK")
# Partie archivage
@bp.route("/justificatif/<int:justif_id>/import", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/import", methods=["POST"])
@scodoc
@login_required
@as_json
@permission_required(Permission.AbsChange)
def justif_import(justif_id: int = None):
"""
Importation d'un fichier (création d'archive)
"""
if len(request.files) == 0:
return json_error(404, "Il n'y a pas de fichier joint")
file = list(request.files.values())[0]
if file.filename == "":
return json_error(404, "Il n'y a pas de fichier joint")
query: Query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
archiver: JustificatifArchiver = JustificatifArchiver()
try:
fname: str
archive_name, fname = archiver.save_justificatif(
justificatif_unique.etudiant,
filename=file.filename,
data=file.stream.read(),
archive_name=archive_name,
user_id=current_user.id,
)
justificatif_unique.fichier = archive_name
db.session.add(justificatif_unique)
db.session.commit()
return {"filename": fname}
except ScoValueError as err:
return json_error(404, err.args[0])
@bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.AbsChange)
def justif_export(justif_id: int = None, filename: str = None):
"""
Retourne un fichier d'une archive d'un justificatif
"""
query: Query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
if archive_name is None:
return json_error(404, "le justificatif ne possède pas de fichier")
archiver: JustificatifArchiver = JustificatifArchiver()
try:
return archiver.get_justificatif_file(
archive_name, justificatif_unique.etudiant, filename
)
except ScoValueError as err:
return json_error(404, err.args[0])
@bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"])
@scodoc
@login_required
@as_json
@permission_required(Permission.AbsChange)
def justif_remove(justif_id: int = None):
"""
Supression d'un fichier ou d'une archive
# TOTALK: Doc, expliquer les noms coté server
{
"remove": <"all"/"list">
"filenames"?: [
<filename:str>,
...
]
}
"""
data: dict = request.get_json(force=True)
query: Query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
if archive_name is None:
return json_error(404, "le justificatif ne possède pas de fichier")
remove: str = data.get("remove")
if remove is None or remove not in ("all", "list"):
return json_error(404, "param 'remove': Valeur invalide")
archiver: JustificatifArchiver = JustificatifArchiver()
etud = justificatif_unique.etudiant
try:
if remove == "all":
archiver.delete_justificatif(etud, archive_name=archive_name)
justificatif_unique.fichier = None
db.session.add(justificatif_unique)
db.session.commit()
else:
for fname in data.get("filenames", []):
archiver.delete_justificatif(
etud,
archive_name=archive_name,
filename=fname,
)
if len(archiver.list_justificatifs(archive_name, etud)) == 0:
archiver.delete_justificatif(etud, archive_name)
justificatif_unique.fichier = None
db.session.add(justificatif_unique)
db.session.commit()
except ScoValueError as err:
return json_error(404, err.args[0])
return {"response": "removed"}
@bp.route("/justificatif/<int:justif_id>/list", methods=["GET"])
@api_web_bp.route("/justificatif/<int:justif_id>/list", methods=["GET"])
@scodoc
@login_required
@as_json
@permission_required(Permission.ScoView)
def justif_list(justif_id: int = None):
"""
Liste les fichiers du justificatif
"""
query: Query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
archive_name: str = justificatif_unique.fichier
filenames: list[str] = []
archiver: JustificatifArchiver = JustificatifArchiver()
if archive_name is not None:
filenames = archiver.list_justificatifs(
archive_name, justificatif_unique.etudiant
)
retour = {"total": len(filenames), "filenames": []}
for filename in filenames:
if int(filename[1]) == current_user.id or current_user.has_permission(
Permission.AbsJustifView
):
retour["filenames"].append(filename[0])
return retour
# Partie justification
@bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"])
@api_web_bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"])
@scodoc
@login_required
@as_json
@permission_required(Permission.AbsChange)
def justif_justifies(justif_id: int = None):
"""
Liste assiduite_id justifiées par le justificatif
"""
query: Query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique: Justificatif = query.first_or_404()
assiduites_list: list[int] = scass.justifies(justificatif_unique)
return assiduites_list
# -- Utils --
def _filter_manager(requested, justificatifs_query):
"""
Retourne les justificatifs entrés filtrés en fonction de la request
"""
# cas 1 : etat justificatif
etat = requested.args.get("etat")
if etat is not None:
justificatifs_query = scass.filter_justificatifs_by_etat(
justificatifs_query, etat
)
# cas 2 : date de début
deb = requested.args.get("date_debut", "").replace(" ", "+")
deb: datetime = scu.is_iso_formated(deb, True)
# cas 3 : date de fin
fin = requested.args.get("date_fin", "").replace(" ", "+")
fin = scu.is_iso_formated(fin, True)
if (deb, fin) != (None, None):
justificatifs_query: Query = scass.filter_by_date(
justificatifs_query, Justificatif, deb, fin
)
user_id = requested.args.get("user_id", False)
if user_id is not False:
justificatifs_query: Query = scass.filter_by_user_id(
justificatifs_query, user_id
)
# cas 5 : formsemestre_id
formsemestre_id = requested.args.get("formsemestre_id")
if formsemestre_id not in [None, "", -1]:
formsemestre: FormSemestre = None
try:
formsemestre_id = int(formsemestre_id)
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
justificatifs_query = scass.filter_by_formsemestre(
justificatifs_query, Justificatif, formsemestre
)
except ValueError:
formsemestre = None
2023-09-11 08:31:09 +02:00
order = requested.args.get("order", None)
if order is not None:
justificatifs_query: Query = justificatifs_query.order_by(
Justificatif.date_debut.desc()
)
2023-09-12 09:37:03 +02:00
courant = requested.args.get("courant", None)
if courant is not None:
annee: int = scu.annee_scolaire()
justificatifs_query: Query = justificatifs_query.filter(
Justificatif.date_debut >= scu.date_debut_anne_scolaire(annee),
Justificatif.date_fin <= scu.date_fin_anne_scolaire(annee),
)
group_id = requested.args.get("group_id", None)
if group_id is not None:
try:
group_id = int(group_id)
etudids: list[int] = [etu["etudid"] for etu in get_group_members(group_id)]
justificatifs_query = justificatifs_query.filter(
Justificatif.etudid.in_(etudids)
)
except ValueError:
group_id = None
return justificatifs_query