1
0
forked from ScoDoc/ScoDoc

api justificatif : modèle + api ( archivage)

This commit is contained in:
iziram 2023-02-01 15:08:06 +01:00
parent 86f5751e79
commit 8bc780f2cf
12 changed files with 907 additions and 64 deletions

View File

@ -42,6 +42,7 @@ from app.api import (
formations, formations,
formsemestres, formsemestres,
jury, jury,
justificatif,
logos, logos,
partitions, partitions,
users, users,

View File

@ -6,7 +6,6 @@
"""ScoDoc 9 API : Assiduités """ScoDoc 9 API : Assiduités
""" """
from datetime import datetime from datetime import datetime
from typing import List
from flask import g, jsonify, request from flask import g, jsonify, request
from flask_login import login_required from flask_login import login_required
@ -48,7 +47,7 @@ def assiduite(assiduite_id: int = None):
data = assiduite_query.to_dict() data = assiduite_query.to_dict()
return jsonify(change_etat(data)) return jsonify(_change_etat(data))
@bp.route("/assiduites/<int:etudid>/count", defaults={"with_query": False}) @bp.route("/assiduites/<int:etudid>/count", defaults={"with_query": False})
@ -104,7 +103,7 @@ def count_assiduites(etudid: int = None, with_query: bool = False):
metric: str = "all" metric: str = "all"
if with_query: if with_query:
metric, filtered = count_manager(request) metric, filtered = _count_manager(request)
return jsonify( return jsonify(
scass.get_assiduites_stats( scass.get_assiduites_stats(
@ -161,12 +160,12 @@ def assiduites(etudid: int = None, with_query: bool = False):
assiduites_query = etud.assiduites assiduites_query = etud.assiduites
if with_query: if with_query:
assiduites_query = filter_manager(request, assiduites_query) assiduites_query = _filter_manager(request, assiduites_query)
data_set: List[dict] = [] data_set: list[dict] = []
for ass in assiduites_query.all(): for ass in assiduites_query.all():
data = ass.to_dict() data = ass.to_dict()
data_set.append(change_etat(data)) data_set.append(_change_etat(data))
return jsonify(data_set) return jsonify(data_set)
@ -199,12 +198,12 @@ def assiduites_formsemestre(formsemestre_id: int, with_query: bool = False):
assiduites_query = scass.filter_by_formsemestre(Assiduite.query, formsemestre) assiduites_query = scass.filter_by_formsemestre(Assiduite.query, formsemestre)
if with_query: if with_query:
assiduites_query = filter_manager(request, assiduites_query) assiduites_query = _filter_manager(request, assiduites_query)
data_set: List[dict] = [] data_set: list[dict] = []
for ass in assiduites_query.all(): for ass in assiduites_query.all():
data = ass.to_dict() data = ass.to_dict()
data_set.append(change_etat(data)) data_set.append(_change_etat(data))
return jsonify(data_set) return jsonify(data_set)
@ -246,7 +245,7 @@ def count_assiduites_formsemestre(
metric: str = "all" metric: str = "all"
filtered: dict = {} filtered: dict = {}
if with_query: if with_query:
metric, filtered = count_manager(request) metric, filtered = _count_manager(request)
return jsonify(scass.get_assiduites_stats(assiduites_query, metric, filtered)) return jsonify(scass.get_assiduites_stats(assiduites_query, metric, filtered))
@ -257,7 +256,7 @@ def count_assiduites_formsemestre(
@login_required @login_required
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange) # @permission_required(Permission.ScoAssiduiteChange)
def create(etudid: int = None): def assiduite_create(etudid: int = None):
""" """
Création d'une assiduité pour l'étudiant (etudid) Création d'une assiduité pour l'étudiant (etudid)
La requête doit avoir un content type "application/json": La requête doit avoir un content type "application/json":
@ -277,9 +276,6 @@ def create(etudid: int = None):
... ...
] ]
TODO:
- vérifier si l'entrée est bien une liste
""" """
etud: Identite = Identite.query.filter_by(id=etudid).first_or_404() etud: Identite = Identite.query.filter_by(id=etudid).first_or_404()
@ -304,7 +300,7 @@ def _create_singular(
data: dict, data: dict,
etud: Identite, etud: Identite,
) -> tuple[int, object]: ) -> tuple[int, object]:
errors: List[str] = [] errors: list[str] = []
# -- vérifications de l'objet json -- # -- vérifications de l'objet json --
# cas 1 : ETAT # cas 1 : ETAT
@ -314,7 +310,7 @@ def _create_singular(
elif etat not in scu.ETATS_ASSIDUITE: elif etat not in scu.ETATS_ASSIDUITE:
errors.append("param 'etat': invalide") errors.append("param 'etat': invalide")
data = change_etat(data, False) data = _change_etat(data, False)
etat = data.get("etat", None) etat = data.get("etat", None)
# cas 2 : date_debut # cas 2 : date_debut
@ -379,7 +375,7 @@ def _create_singular(
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange) # @permission_required(Permission.ScoAssiduiteChange)
def delete(): def assiduite_cdelete():
""" """
Suppression d'une assiduité à partir de son id Suppression d'une assiduité à partir de son id
@ -422,7 +418,7 @@ def _delete_singular(assiduite_id: int, database):
@scodoc @scodoc
@permission_required(Permission.ScoView) @permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange) # @permission_required(Permission.ScoAssiduiteChange)
def edit(assiduite_id: int): def assiduite_cedit(assiduite_id: int):
""" """
Edition d'une assiduité à partir de son id Edition d'une assiduité à partir de son id
La requête doit avoir un content type "application/json": La requête doit avoir un content type "application/json":
@ -435,14 +431,14 @@ def edit(assiduite_id: int):
assiduite_unique: Assiduite = Assiduite.query.filter_by( assiduite_unique: Assiduite = Assiduite.query.filter_by(
id=assiduite_id id=assiduite_id
).first_or_404() ).first_or_404()
errors: List[str] = [] errors: list[str] = []
data = request.get_json(force=True) data = request.get_json(force=True)
# Vérifications de data # Vérifications de data
# Cas 1 : Etat # Cas 1 : Etat
if data.get("etat") is not None: if data.get("etat") is not None:
data = change_etat(data, False) data = _change_etat(data, False)
if data.get("etat") is None: if data.get("etat") is None:
errors.append("param 'etat': invalide") errors.append("param 'etat': invalide")
else: else:
@ -482,7 +478,7 @@ def edit(assiduite_id: int):
# -- Utils -- # -- Utils --
def change_etat(data: dict, from_int: bool = True): def _change_etat(data: dict, from_int: bool = True):
"""change dans un json la valeur du champs état""" """change dans un json la valeur du champs état"""
if from_int: if from_int:
data["etat"] = scu.ETAT_ASSIDUITE_NAME.get(data["etat"]) data["etat"] = scu.ETAT_ASSIDUITE_NAME.get(data["etat"])
@ -491,7 +487,7 @@ def change_etat(data: dict, from_int: bool = True):
return data return data
def count_manager(requested) -> tuple[str, dict]: def _count_manager(requested) -> tuple[str, dict]:
""" """
Retourne la/les métriques à utiliser ainsi que le filtre donnés en query de la requête Retourne la/les métriques à utiliser ainsi que le filtre donnés en query de la requête
""" """
@ -544,28 +540,32 @@ def count_manager(requested) -> tuple[str, dict]:
return (metric, filtered) return (metric, filtered)
def filter_manager(requested, assiduites_query): def _filter_manager(requested, assiduites_query):
""" """
Retourne les assiduites entrées filtrées en fonction de la request Retourne les assiduites entrées filtrées en fonction de la request
""" """
# cas 1 : etat assiduite # cas 1 : etat assiduite
etat = requested.args.get("etat") etat = requested.args.get("etat")
if etat is not None: if etat is not None:
assiduites_query = scass.filter_by_etat(assiduites_query, etat) assiduites_query = scass.filter_assiduites_by_etat(assiduites_query, etat)
# cas 2 : date de début # cas 2 : date de début
deb = requested.args.get("date_debut") deb = requested.args.get("date_debut")
deb: datetime = scu.is_iso_formated(deb, True) deb: datetime = scu.is_iso_formated(deb, True)
if deb is not None: if deb is not None:
assiduites_query = scass.filter_by_date(assiduites_query, deb, sup=True) assiduites_query = scass.filter_assiduites_by_date(
assiduites_query, deb, sup=True
)
# cas 3 : date de fin # cas 3 : date de fin
fin = requested.args.get("date_fin") fin = requested.args.get("date_fin")
fin = scu.is_iso_formated(fin, True) fin = scu.is_iso_formated(fin, True)
if fin is not None: if fin is not None:
assiduites_query = scass.filter_by_date(assiduites_query, fin, sup=False) assiduites_query = scass.filter_assiduites_by_date(
assiduites_query, fin, sup=False
)
# cas 4 : moduleimpl_id # cas 4 : moduleimpl_id
module = requested.args.get("moduleimpl_id", False) module = requested.args.get("moduleimpl_id", False)

386
app/api/justificatif.py Normal file
View File

@ -0,0 +1,386 @@
##############################################################################
# ScoDoc
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
# See LICENSE
##############################################################################
"""ScoDoc 9 API : Assiduités
"""
import os
from datetime import datetime
import app.scodoc.sco_assiduites as scass
import app.scodoc.sco_utils as scu
from app import db
from app.api import api_bp as bp
from app.api import api_web_bp
from app.scodoc.sco_exceptions import ScoValueError
from app.decorators import permission_required, scodoc
from app.models import Identite, Justificatif
from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
from app.scodoc.sco_permissions import Permission
from flask import g, jsonify, request
from flask_login import login_required
from app.scodoc.sco_utils import json_error
# @bp.route("/justificatif/import")
# @api_web_bp.route("/justificatif/import")
# @scodoc
# def justificatif():
# """ """
# archiver: JustificatifArchiver = JustificatifArchiver()
# filename: str = "lol.txt"
# data: bytes = "test".encode("utf-8")
# archiver.save_justificatif(
# etudid=1, filename=filename, data=data, archive_id="2023-02-01-10-29-20"
# )
# return jsonify([filename, "done"])
# @bp.route("/justificatif/remove")
# @api_web_bp.route("/justificatif/remove")
# @scodoc
# def justremove():
# """ """
# archiver: JustificatifArchiver = JustificatifArchiver()
# archiver.delete_justificatif(etudid=1, archive_id="2023-02-01-10-29-20")
# return jsonify("done")
# Partie Modèle
# TODO: justificatif
@bp.route("/justificatif/<int:justif_id>")
@api_web_bp.route("/assiduite/<int:justif_id>")
@scodoc
@permission_required(Permission.ScoView)
def justificatif(justif_id: int = None):
"""Retourne un objet justificatif à partir de son id
Exemple de résultat:
{
"justif_id": 1,
"etudid": 2,
"date_debut": "2022-10-31T08:00+01:00",
"date_fin": "2022-10-31T10:00+01:00",
"etat": "valide",
"fichier": "archive_id",
"raison": "une raison",
"entry_date": "2022-10-31T08:00+01:00",
}
"""
query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
justificatif_unique = query.first_or_404()
data = justificatif_unique.to_dict()
return jsonify(_change_etat(data))
# TODO: justificatifs[-query]
@bp.route("/justificatifs/<int:etudid>", defaults={"with_query": False})
@bp.route("/justificatifs/<int:etudid>/query", defaults={"with_query": True})
@api_web_bp.route("/justificatifs/<int:etudid>", defaults={"with_query": False})
@api_web_bp.route("/justificatifs/<int:etudid>/query", defaults={"with_query": True})
@login_required
@scodoc
@permission_required(Permission.ScoView)
def justificatifs(etudid: int = None, with_query: bool = False):
"""
Retourne toutes les assiduités d'un étudiant
chemin : /justificatifs/<int:etudid>
Un filtrage peut être donné avec une query
chemin : /justificatifs/<int:etudid>/query?
Les différents filtres :
Etat (etat du justificatif -> validé, non validé, modifé, en attente):
query?etat=[- liste des états séparé par une virgule -]
ex: .../query?etat=validé,modifié
Date debut
(date de début du justificatif, sont affichés les justificatifs
dont la date de début est supérieur ou égale à la valeur donnée):
query?date_debut=[- date au format iso -]
ex: query?date_debut=2022-11-03T08:00+01:00
Date fin
(date de fin du justificatif, sont affichés les justificatifs
dont la date de fin est inférieure ou égale à la valeur donnée):
query?date_fin=[- date au format iso -]
ex: query?date_fin=2022-11-03T10:00+01:00
"""
query = Identite.query.filter_by(id=etudid)
if g.scodoc_dept:
query = query.filter_by(dept_id=g.scodoc_dept_id)
etud: Identite = query.first_or_404(etudid)
justificatifs_query = etud.justificatifs
if with_query:
justificatifs_query = _filter_manager(request, justificatifs_query)
data_set: list[dict] = []
for just in justificatifs_query.all():
data = just.to_dict()
data_set.append(_change_etat(data))
return jsonify(data_set)
# TODO: justificatif-create
@bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
@api_web_bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_create(etudid: int = None):
"""
Création d'un justificatif pour l'étudiant (etudid)
La requête doit avoir un content type "application/json":
[
{
"date_debut": str,
"date_fin": str,
"etat": str,
},
{
"date_debut": str,
"date_fin": str,
"etat": str,
"raison":str,
}
...
]
"""
etud: Identite = Identite.query.filter_by(id=etudid).first_or_404()
create_list: list[object] = request.get_json(force=True)
if not isinstance(create_list, list):
return json_error(404, "Le contenu envoyé n'est pas une liste")
errors: dict[int, str] = {}
success: dict[int, object] = {}
for i, data in enumerate(create_list):
code, obj = _create_singular(data, etud)
if code == 404:
errors[i] = obj
else:
success[i] = obj
return jsonify({"errors": errors, "success": success})
def _create_singular(
data: dict,
etud: Identite,
) -> tuple[int, object]:
errors: list[str] = []
# -- vérifications de l'objet json --
# cas 1 : ETAT
etat = data.get("etat", None)
if etat is None:
errors.append("param 'etat': manquant")
elif etat not in scu.ETATS_JUSTIFICATIF:
errors.append("param 'etat': invalide")
data = _change_etat(data, False)
etat = data.get("etat", None)
# cas 2 : date_debut
date_debut = data.get("date_debut", None)
if date_debut is None:
errors.append("param 'date_debut': manquant")
deb = scu.is_iso_formated(date_debut, convert=True)
if deb is None:
errors.append("param 'date_debut': format invalide")
# cas 3 : date_fin
date_fin = data.get("date_fin", None)
if date_fin is None:
errors.append("param 'date_fin': manquant")
fin = scu.is_iso_formated(date_fin, convert=True)
if fin is None:
errors.append("param 'date_fin': format invalide")
# cas 4 : raison
raison: str = data.get("raison", None)
if errors:
err: str = ", ".join(errors)
return (404, err)
# TOUT EST OK
try:
nouv_justificatif: Justificatif = Justificatif.create_justificatif(
date_debut=deb,
date_fin=fin,
etat=etat,
etud=etud,
raison=raison,
)
db.session.add(nouv_justificatif)
db.session.commit()
return (200, {"justif_id": nouv_justificatif.id})
except ScoValueError as excp:
return (
404,
excp.args[0],
)
# TODO: justificatif-edit
@bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_edit(justif_id: int):
"""
Edition d'un justificatif à partir de son id
La requête doit avoir un content type "application/json":
{
"etat"?: str,
"raison"?: str
}
"""
justificatif_unique: Justificatif = Justificatif.query.filter_by(
id=justif_id
).first_or_404()
errors: list[str] = []
data = request.get_json(force=True)
# Vérifications de data
# Cas 1 : Etat
if data.get("etat") is not None:
data = _change_etat(data, False)
if data.get("etat") is None:
errors.append("param 'etat': invalide")
else:
justificatif_unique.etat = data.get("etat")
# Cas 2 : raison
raison = data.get("raison", False)
if raison is not False:
justificatif_unique.raison = raison
if errors:
err: str = ", ".join(errors)
return json_error(404, err)
db.session.add(justificatif_unique)
db.session.commit()
return jsonify({"OK": True})
# TODO: justificatif-delete
@bp.route("/justificatif/delete", methods=["POST"])
@api_web_bp.route("/justificatif/delete", methods=["POST"])
@login_required
@scodoc
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_delete():
"""
Suppression d'un justificatif à partir de son id
Forme des données envoyées :
[
<justif_id:int>,
...
]
"""
justificatifs_list: list[int] = request.get_json(force=True)
if not isinstance(justificatifs_list, list):
return json_error(404, "Le contenu envoyé n'est pas une liste")
output = {"errors": {}, "success": {}}
for i, ass in enumerate(justificatifs_list):
code, msg = _delete_singular(ass, db)
if code == 404:
output["errors"][f"{i}"] = msg
else:
output["success"][f"{i}"] = {"OK": True}
db.session.commit()
return jsonify(output)
def _delete_singular(justif_id: int, database):
justificatif_unique: Justificatif = Justificatif.query.filter_by(
id=justif_id
).first()
if justificatif_unique is None:
return (404, "Justificatif non existant")
database.session.delete(justificatif_unique)
return (200, "OK")
# Partie archivage
# TODO: justificatif-import
# TODO: justificatif-export
# TODO: justificatif-remove
# TODO: justificatif-list
# Partie justification
# TODO: justificatif-justified
# -- Utils --
def _change_etat(data: dict, from_int: bool = True):
"""change dans un json la valeur du champs état"""
if from_int:
data["etat"] = scu.ETAT_JUSTIFICATIF_NAME.get(data["etat"])
else:
data["etat"] = scu.ETATS_JUSTIFICATIF.get(data["etat"])
return data
def _filter_manager(requested, justificatifs_query):
"""
Retourne les justificatifs entrés filtrés en fonction de la request
"""
# cas 1 : etat justificatif
etat = requested.args.get("etat")
if etat is not None:
justificatifs_query = scass.filter_justificatifs_by_etat(
justificatifs_query, etat
)
# cas 2 : date de début
deb = requested.args.get("date_debut")
deb: datetime = scu.is_iso_formated(deb, True)
if deb is not None:
justificatifs_query = scass.filter_justificatifs_by_date(
justificatifs_query, deb, sup=True
)
# cas 3 : date de fin
fin = requested.args.get("date_fin")
fin = scu.is_iso_formated(fin, True)
if fin is not None:
justificatifs_query = scass.filter_justificatifs_by_date(
justificatifs_query, fin, sup=False
)
return justificatifs_query

View File

@ -7,7 +7,12 @@ from app import db
from app.models import ModuleImpl from app.models import ModuleImpl
from app.models.etudiants import Identite from app.models.etudiants import Identite
from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_utils import EtatAssiduite, is_period_overlapping, localize_datetime from app.scodoc.sco_utils import (
EtatAssiduite,
EtatJustificatif,
is_period_overlapping,
localize_datetime,
)
class Assiduite(db.Model): class Assiduite(db.Model):
@ -123,7 +128,8 @@ class Justificatif(db.Model):
__tablename__ = "justificatifs" __tablename__ = "justificatifs"
justif_id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
justif_id = db.synonym("id")
date_debut = db.Column( date_debut = db.Column(
db.DateTime(timezone=True), server_default=db.func.now(), nullable=False db.DateTime(timezone=True), server_default=db.func.now(), nullable=False
@ -152,7 +158,7 @@ class Justificatif(db.Model):
def to_dict(self) -> dict: def to_dict(self) -> dict:
data = { data = {
"justif_id": self.assiduite_id, "justif_id": self.justif_id,
"etudid": self.etudid, "etudid": self.etudid,
"date_debut": self.date_debut, "date_debut": self.date_debut,
"date_fin": self.date_fin, "date_fin": self.date_fin,
@ -162,3 +168,41 @@ class Justificatif(db.Model):
"entry_date": self.entry_date, "entry_date": self.entry_date,
} }
return data return data
@classmethod
def create_justificatif(
cls,
etud: Identite,
date_debut: datetime,
date_fin: datetime,
etat: EtatJustificatif,
raison: str = None,
) -> object or int:
"""Créer un nouveau justificatif pour l'étudiant"""
# Vérification de non duplication des périodes
justificatifs: list[Justificatif] = etud.justificatifs.all()
date_debut = localize_datetime(date_debut)
date_fin = localize_datetime(date_fin)
justificatifs = [
just
for just in justificatifs
if is_period_overlapping(
(date_debut, date_fin),
(just.date_debut, just.date_fin),
)
]
if len(justificatifs) != 0:
raise ScoValueError(
"Duplication des justificatifs (la période rentrée rentre en conflit avec un justificatif enregistré)"
)
nouv_assiduite = Justificatif(
date_debut=date_debut,
date_fin=date_fin,
etat=etat,
etudiant=etud,
raison=raison,
)
return nouv_assiduite

View File

@ -0,0 +1,90 @@
from app.scodoc.sco_archives import BaseArchiver
import os
class JustificatifArchiver(BaseArchiver):
"""
TOTALK:
- oid -> etudid
- archive_id -> date de création de l'archive (une archive par dépot de document)
justificatif
<dept_id>
<etudid/oid>
<archive_id>
[_description.txt]
[<filename.ext>]
TODO:
- Faire fonction suppression fichier unique dans archive
"""
def __init__(self):
BaseArchiver.__init__(self, archive_type="justificatifs")
def save_justificatif(
self,
etudid: int,
filename: str,
data: bytes or str,
archive_id: str = None,
description: str = "",
):
"""
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
"""
if archive_id is None:
archive_id: str = self.create_obj_archive(
oid=etudid, description=description
)
else:
archive_id = self._true_archive_id(archive_id, etudid)
self.store(archive_id, filename, data)
def delete_justificatif(self, etudid: int, archive_id: str, filename: str = None):
"""
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
"""
if str(etudid) not in self.list_oids():
raise ValueError(f"Aucune archive pour etudid[{etudid}]")
archive_id = self._true_archive_id(archive_id, etudid)
if filename is not None:
if filename not in self.list_archive(archive_id):
raise ValueError(
f"filename inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]"
)
path: str = os.path.join(self.get_obj_dir(etudid), archive_id, filename)
if os.path.isfile(path):
os.remove(path)
else:
self.delete_archive(
os.path.join(
self.get_obj_dir(etudid),
archive_id,
)
)
def _true_archive_id(self, archive_id: str, etudid: int):
"""
Test si l'archive_id est bien dans le dossier d'archive
Retourne le chemin complet de l'id
"""
archives: list[str] = [
arc for arc in self.list_obj_archives(etudid) if archive_id in arc
]
if len(archives) == 0:
raise ValueError(
f"archive_id[{archive_id}] inconnu pour etudid[{etudid}]",
self.list_obj_archives(etudid),
)
return archives[0]

View File

@ -1,7 +1,7 @@
from datetime import date, datetime, time, timedelta from datetime import date, datetime, time, timedelta
import app.scodoc.sco_utils as scu import app.scodoc.sco_utils as scu
from app.models.assiduites import Assiduite from app.models.assiduites import Assiduite, Justificatif
from app.models.etudiants import Identite from app.models.etudiants import Identite
from app.models.formsemestre import FormSemestre, FormSemestreInscription from app.models.formsemestre import FormSemestre, FormSemestreInscription
@ -15,11 +15,15 @@ def get_assiduites_stats(
if filtered is not None: if filtered is not None:
for key in filtered: for key in filtered:
if key == "etat": if key == "etat":
assiduites = filter_by_etat(assiduites, filtered[key]) assiduites = filter_assiduites_by_etat(assiduites, filtered[key])
elif key == "date_fin": elif key == "date_fin":
assiduites = filter_by_date(assiduites, filtered[key], sup=False) assiduites = filter_assiduites_by_date(
assiduites, filtered[key], sup=False
)
elif key == "date_debut": elif key == "date_debut":
assiduites = filter_by_date(assiduites, filtered[key], sup=True) assiduites = filter_assiduites_by_date(
assiduites, filtered[key], sup=True
)
elif key == "moduleimpl_id": elif key == "moduleimpl_id":
assiduites = filter_by_module_impl(assiduites, filtered[key]) assiduites = filter_by_module_impl(assiduites, filtered[key])
elif key == "formsemestre": elif key == "formsemestre":
@ -76,7 +80,7 @@ def get_count(assiduites: Assiduite) -> dict[str, int or float]:
return output return output
def filter_by_etat(assiduites: Assiduite, etat: str) -> Assiduite: def filter_assiduites_by_etat(assiduites: Assiduite, etat: str) -> Assiduite:
""" """
Filtrage d'une collection d'assiduites en fonction de leur état Filtrage d'une collection d'assiduites en fonction de leur état
""" """
@ -85,7 +89,7 @@ def filter_by_etat(assiduites: Assiduite, etat: str) -> Assiduite:
return assiduites.filter(Assiduite.etat.in_(etats)) return assiduites.filter(Assiduite.etat.in_(etats))
def filter_by_date( def filter_assiduites_by_date(
assiduites: Assiduite, date_: datetime, sup: bool = True assiduites: Assiduite, date_: datetime, sup: bool = True
) -> Assiduite: ) -> Assiduite:
""" """
@ -106,6 +110,38 @@ def filter_by_date(
return assiduites.filter(Assiduite.date_fin <= date_) return assiduites.filter(Assiduite.date_fin <= date_)
def filter_justificatifs_by_etat(
justificatifs: Justificatif, etat: str
) -> Justificatif:
"""
Filtrage d'une collection de justificatifs en fonction de leur état
"""
etats: list[str] = list(etat.split(","))
etats = [scu.ETATS_JUSTIFICATIF.get(e, -1) for e in etats]
return justificatifs.filter(Justificatif.etat.in_(etats))
def filter_justificatifs_by_date(
justificatifs: Justificatif, date_: datetime, sup: bool = True
) -> Assiduite:
"""
Filtrage d'une collection d'assiduites en fonction d'une date
Sup == True -> les assiduites doivent débuter après 'date'\n
Sup == False -> les assiduites doivent finir avant 'date'
"""
if date_.tzinfo is None:
first_justificatif: Justificatif = justificatifs.first()
if first_justificatif is not None:
date_: datetime = date_.replace(tzinfo=first_justificatif.date_debut.tzinfo)
if sup:
return justificatifs.filter(Justificatif.date_debut >= date_)
return justificatifs.filter(Justificatif.date_fin <= date_)
def filter_by_module_impl( def filter_by_module_impl(
assiduites: Assiduite, module_impl_id: int or None assiduites: Assiduite, module_impl_id: int or None
) -> Assiduite: ) -> Assiduite:

View File

@ -128,6 +128,13 @@ ETAT_JUSTIFICATIF_NAME = {
EtatJustificatif.MODIFIE: "modifié", EtatJustificatif.MODIFIE: "modifié",
} }
ETATS_JUSTIFICATIF = {
"validé": EtatJustificatif.VALIDE,
"non vaidé": EtatJustificatif.NON_VALIDE,
"en attente": EtatJustificatif.ATTENTE,
"modifié": EtatJustificatif.MODIFIE,
}
def is_iso_formated(date: str, convert=False) -> bool or datetime.datetime or None: def is_iso_formated(date: str, convert=False) -> bool or datetime.datetime or None:
""" """

View File

@ -1,8 +1,8 @@
"""Modeles assiduites et justificatifs """modèles assiduites justificatifs
Revision ID: 961b2f2c595d Revision ID: dbcf2175e87f
Revises: 5c7b208355df Revises: 5c7b208355df
Create Date: 2023-01-31 15:37:02.961533 Create Date: 2023-02-01 14:21:06.989190
""" """
from alembic import op from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '961b2f2c595d' revision = 'dbcf2175e87f'
down_revision = '5c7b208355df' down_revision = '5c7b208355df'
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -19,7 +19,7 @@ depends_on = None
def upgrade(): def upgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.create_table('justificatifs', op.create_table('justificatifs',
sa.Column('justif_id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('date_debut', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), sa.Column('date_debut', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('date_fin', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False), sa.Column('date_fin', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False),
sa.Column('etudid', sa.Integer(), nullable=False), sa.Column('etudid', sa.Integer(), nullable=False),
@ -28,7 +28,7 @@ def upgrade():
sa.Column('raison', sa.Text(), nullable=True), sa.Column('raison', sa.Text(), nullable=True),
sa.Column('fichier', sa.Text(), nullable=True), sa.Column('fichier', sa.Text(), nullable=True),
sa.ForeignKeyConstraint(['etudid'], ['identite.id'], ondelete='CASCADE'), sa.ForeignKeyConstraint(['etudid'], ['identite.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('justif_id') sa.PrimaryKeyConstraint('id')
) )
op.create_index(op.f('ix_justificatifs_etudid'), 'justificatifs', ['etudid'], unique=False) op.create_index(op.f('ix_justificatifs_etudid'), 'justificatifs', ['etudid'], unique=False)
op.create_table('assiduites', op.create_table('assiduites',

View File

@ -0,0 +1,255 @@
"""
Test de l'api Assiduité
Ecrit par HARTMANN Matthias
"""
from random import randint
from tests.api.setup_test_api import GET, POST_JSON, APIError, api_headers
ETUDID = 1
FAUX = 42069
JUSTIFICATIFS_FIELDS = {
"justif_id": int,
"etudid": int,
"date_debut": str,
"date_fin": str,
"etat": str,
"raison": str,
"entry_date": str,
"fichier": str,
}
CREATE_FIELD = {"justif_id": int}
BATCH_FIELD = {"errors": dict, "success": dict}
TO_REMOVE = []
def check_fields(data, fields: dict = None):
if fields is None:
fields = JUSTIFICATIFS_FIELDS
assert set(data.keys()) == set(fields.keys())
for key in data:
if key in ("raison", "fichier"):
assert isinstance(data[key], fields[key]) or data[key] is None
else:
assert isinstance(data[key], fields[key])
def check_failure_get(path, headers, err=None):
try:
GET(path=path, headers=headers)
# ^ Renvoi un 404
except APIError as api_err:
if err is not None:
assert api_err.payload["message"] == err
else:
raise APIError("Le GET n'aurait pas du fonctionner")
def check_failure_post(path, headers, data, err=None):
try:
data = POST_JSON(path=path, headers=headers, data=data)
# ^ Renvoi un 404
except APIError as api_err:
if err is not None:
assert api_err.payload["message"] == err
else:
raise APIError("Le GET n'aurait pas du fonctionner")
def create_data(etat: str, day: str, raison: str = None):
data = {
"date_debut": f"2022-01-{day}T08:00",
"date_fin": f"2022-01-{day}T10:00",
"etat": etat,
}
if raison is not None:
data["desc"] = raison
return data
def test_route_justificatif(api_headers):
# Bon fonctionnement == id connu
data = GET(path="/justificatif/1", headers=api_headers)
check_fields(data)
# Mauvais Fonctionnement == id inconnu
check_failure_get(
f"/justificatif/{FAUX}",
api_headers,
)
def test_route_justificatifs(api_headers):
# Bon fonctionnement
data = GET(path=f"/justificatifs/{ETUDID}", headers=api_headers)
assert isinstance(data, list)
for just in data:
check_fields(just, JUSTIFICATIFS_FIELDS)
data = GET(path=f"/justificatifs/{ETUDID}/query?", headers=api_headers)
assert isinstance(data, list)
for just in data:
check_fields(just, JUSTIFICATIFS_FIELDS)
# Mauvais fonctionnement
check_failure_get(f"/justificatifs/{FAUX}", api_headers)
check_failure_get(f"/justificatifs/{FAUX}/query?", api_headers)
def test_route_create(api_headers):
# -== Unique ==-
# Bon fonctionnement
data = create_data("validé", "01")
res = POST_JSON(f"/justificatif/{ETUDID}/create", [data], api_headers)
check_fields(res, BATCH_FIELD)
assert len(res["success"]) == 1
TO_REMOVE.append(res["success"]["0"]["justif_id"])
data2 = create_data("modifié", "02", "raison")
res = POST_JSON(f"/justificatif/{ETUDID}/create", [data2], api_headers)
check_fields(res, BATCH_FIELD)
assert len(res["success"]) == 1
TO_REMOVE.append(res["success"]["0"]["justif_id"])
# Mauvais fonctionnement
check_failure_post(f"/justificatif/{FAUX}/create", api_headers, [data])
res = POST_JSON(f"/justificatif/{ETUDID}/create", [data], api_headers)
check_fields(res, BATCH_FIELD)
assert len(res["errors"]) == 1
assert (
res["errors"]["0"]
== "Duplication des justificatifs (la période rentrée rentre en conflit avec un justificatif enregistré)"
)
res = POST_JSON(
f"/justificatif/{ETUDID}/create",
[create_data("absent", "03")],
api_headers,
)
check_fields(res, BATCH_FIELD)
assert len(res["errors"]) == 1
assert res["errors"]["0"] == "param 'etat': invalide"
# -== Multiple ==-
# Bon Fonctionnement
etats = ["validé", "modifé", "non validé", "en attente"]
data = [
create_data(etats[d % 4], 10 + d, "raison" if d % 2 else None)
for d in range(randint(3, 5))
]
res = POST_JSON(f"/justificatif/{ETUDID}/create", data, api_headers)
check_fields(res, BATCH_FIELD)
for dat in res["success"]:
check_fields(res["success"][dat], CREATE_FIELD)
TO_REMOVE.append(res["success"][dat]["justif_id"])
# Mauvais Fonctionnement
data2 = [
create_data("modifié", "01"),
create_data(None, "25"),
create_data("blabla", 26),
create_data("validé", 32),
]
res = POST_JSON(f"/justificatif/{ETUDID}/create", data2, api_headers)
check_fields(res, BATCH_FIELD)
assert len(res["errors"]) == 4
assert (
res["errors"]["0"]
== "Duplication des justificatifs (la période rentrée rentre en conflit avec un justificatif enregistré)"
)
assert res["errors"]["1"] == "param 'etat': manquant"
assert res["errors"]["2"] == "param 'etat': invalide"
assert (
res["errors"]["3"]
== "param 'date_debut': format invalide, param 'date_fin': format invalide"
)
def test_route_edit(api_headers):
# Bon fonctionnement
data = {"etat": "modifié", "raison": "test"}
res = POST_JSON(f"/justificatif/{TO_REMOVE[0]}/edit", data, api_headers)
assert res == {"OK": True}
data["raison"] = None
res = POST_JSON(f"/justificatif/{TO_REMOVE[1]}/edit", data, api_headers)
assert res == {"OK": True}
# Mauvais fonctionnement
check_failure_post(f"/justificatif/{FAUX}/edit", api_headers, data)
data["etat"] = "blabla"
check_failure_post(
f"/justificatif/{TO_REMOVE[2]}/edit",
api_headers,
data,
err="param 'etat': invalide",
)
def test_route_delete(api_headers):
# -== Unique ==-
# Bon fonctionnement
data = TO_REMOVE[0]
res = POST_JSON("/justificatif/delete", [data], api_headers)
check_fields(res, BATCH_FIELD)
for dat in res["success"]:
assert res["success"][dat] == {"OK": True}
# Mauvais fonctionnement
res = POST_JSON("/justificatif/delete", [data], api_headers)
check_fields(res, BATCH_FIELD)
assert len(res["errors"]) == 1
# -== Multiple ==-
# Bon Fonctionnement
data = TO_REMOVE[1:]
res = POST_JSON("/justificatif/delete", data, api_headers)
check_fields(res, BATCH_FIELD)
for dat in res["success"]:
assert res["success"][dat] == {"OK": True}
# Mauvais Fonctionnement
data2 = [
FAUX,
FAUX + 1,
FAUX + 2,
]
res = POST_JSON("/justificatif/delete", data2, api_headers)
check_fields(res, BATCH_FIELD)
assert len(res["errors"]) == 3
assert all([res["errors"][i] == "Justificatif non existant" for i in res["errors"]])

View File

@ -60,6 +60,7 @@ def test_permissions(api_headers):
"uid": 1, "uid": 1,
"version": "long", "version": "long",
"assiduite_id": 1, "assiduite_id": 1,
"justif_id": 1,
} }
for rule in api_rules: for rule in api_rules:
path = rule.build(args)[1] path = rule.build(args)[1]

View File

@ -142,7 +142,7 @@ def editer_supprimer_assiduiter(etuds: list[Identite], moduleimpls: list[int]):
# Vérification du changement # Vérification du changement
assert ( assert (
scass.filter_by_etat(etuds[0].assiduites, "retard").count() == 3 scass.filter_assiduites_by_etat(etuds[0].assiduites, "retard").count() == 3
), "Edition d'assiduité mauvais" ), "Edition d'assiduité mauvais"
assert ( assert (
scass.filter_by_module_impl(etuds[1].assiduites, moduleimpls[0].id).count() == 2 scass.filter_by_module_impl(etuds[1].assiduites, moduleimpls[0].id).count() == 2
@ -294,22 +294,25 @@ def verifier_comptage_et_filtrage(
# Etat # Etat
assert ( assert (
scass.filter_by_etat(etu2.assiduites, "present").count() == 2 scass.filter_assiduites_by_etat(etu2.assiduites, "present").count() == 2
), "Filtrage de l'état 'présent' mauvais" ), "Filtrage de l'état 'présent' mauvais"
assert ( assert (
scass.filter_by_etat(etu2.assiduites, "retard").count() == 2 scass.filter_assiduites_by_etat(etu2.assiduites, "retard").count() == 2
), "Filtrage de l'état 'retard' mauvais" ), "Filtrage de l'état 'retard' mauvais"
assert ( assert (
scass.filter_by_etat(etu2.assiduites, "absent").count() == 2 scass.filter_assiduites_by_etat(etu2.assiduites, "absent").count() == 2
), "Filtrage de l'état 'absent' mauvais" ), "Filtrage de l'état 'absent' mauvais"
assert ( assert (
scass.filter_by_etat(etu2.assiduites, "absent,retard").count() == 4 scass.filter_assiduites_by_etat(etu2.assiduites, "absent,retard").count() == 4
), "Filtrage de l'état 'absent,retard' mauvais" ), "Filtrage de l'état 'absent,retard' mauvais"
assert ( assert (
scass.filter_by_etat(etu2.assiduites, "absent,retard,present").count() == 6 scass.filter_assiduites_by_etat(
etu2.assiduites, "absent,retard,present"
).count()
== 6
), "Filtrage de l'état 'absent,retard,present' mauvais" ), "Filtrage de l'état 'absent,retard,present' mauvais"
assert ( assert (
scass.filter_by_etat(etu2.assiduites, "autre").count() == 0 scass.filter_assiduites_by_etat(etu2.assiduites, "autre").count() == 0
), "Filtrage de l'état 'autre' mauvais" ), "Filtrage de l'état 'autre' mauvais"
# Module # Module
@ -349,39 +352,39 @@ def verifier_comptage_et_filtrage(
# Date début # Date début
date = scu.localize_datetime("2022-09-01T10:00+01:00") date = scu.localize_datetime("2022-09-01T10:00+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=True).count() == 6 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=True).count() == 6
), "Filtrage 'Date début' mauvais" ), "Filtrage 'Date début' mauvais"
date = scu.localize_datetime("2022-09-03T10:00:00+01:00") date = scu.localize_datetime("2022-09-03T10:00:00+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=True).count() == 5 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=True).count() == 5
), "Filtrage 'Date début' mauvais" ), "Filtrage 'Date début' mauvais"
date = scu.localize_datetime("2022-09-03T10:00:01+01:00") date = scu.localize_datetime("2022-09-03T10:00:01+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=True).count() == 5 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=True).count() == 5
), "Filtrage 'Date début' mauvais" ), "Filtrage 'Date début' mauvais"
date = scu.localize_datetime("2022-09-03T10:00:02+01:00") date = scu.localize_datetime("2022-09-03T10:00:02+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=True).count() == 4 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=True).count() == 4
), "Filtrage 'Date début' mauvais" ), "Filtrage 'Date début' mauvais"
# Date fin # Date fin
date = scu.localize_datetime("2022-09-01T10:00+01:00") date = scu.localize_datetime("2022-09-01T10:00+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=False).count() == 0 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=False).count() == 0
), "Filtrage 'Date fin' mauvais" ), "Filtrage 'Date fin' mauvais"
date = scu.localize_datetime("2022-09-03T10:00:00+01:00") date = scu.localize_datetime("2022-09-03T10:00:00+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=False).count() == 1 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=False).count() == 1
), "Filtrage 'Date fin' mauvais" ), "Filtrage 'Date fin' mauvais"
date = scu.localize_datetime("2022-09-03T10:00:01+01:00") date = scu.localize_datetime("2022-09-03T10:00:01+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=False).count() == 1 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=False).count() == 1
), "Filtrage 'Date fin' mauvais" ), "Filtrage 'Date fin' mauvais"
date = scu.localize_datetime("2023-01-04T13:00:01+01:00") date = scu.localize_datetime("2023-01-04T13:00:01+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=False).count() == 6 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=False).count() == 6
), "Filtrage 'Date fin' mauvais" ), "Filtrage 'Date fin' mauvais"
date = scu.localize_datetime("2023-01-03T11:00:01+01:00") date = scu.localize_datetime("2023-01-03T11:00:01+01:00")
assert ( assert (
scass.filter_by_date(etu2.assiduites, date, sup=False).count() == 4 scass.filter_assiduites_by_date(etu2.assiduites, date, sup=False).count() == 4
), "Filtrage 'Date fin' mauvais" ), "Filtrage 'Date fin' mauvais"

View File

@ -11,14 +11,12 @@ import datetime
import os import os
import random import random
import shutil import shutil
import time
import sys import sys
import time
from app import db from app import db, models
from app.auth.models import Role, User from app.auth.models import Role, User
from app.but.import_refcomp import orebut_import_refcomp from app.but.import_refcomp import orebut_import_refcomp
from app import models
from app.models import departements
from app.models import ( from app.models import (
Absence, Absence,
Assiduite, Assiduite,
@ -27,8 +25,10 @@ from app.models import (
FormSemestre, FormSemestre,
FormSemestreEtape, FormSemestreEtape,
Identite, Identite,
Justificatif,
ModuleImpl, ModuleImpl,
NotesNotes, NotesNotes,
departements,
) )
from app.scodoc import ( from app.scodoc import (
sco_cache, sco_cache,
@ -38,6 +38,7 @@ from app.scodoc import (
sco_groups, sco_groups,
) )
from app.scodoc.sco_permissions import Permission from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_utils import localize_datetime
from tools.fakeportal.gen_nomprenoms import nomprenom from tools.fakeportal.gen_nomprenoms import nomprenom
random.seed(12345678) # tests reproductibles random.seed(12345678) # tests reproductibles
@ -379,13 +380,12 @@ def create_logos():
) )
def ajouter_assiduites(formsemestre: FormSemestre): def ajouter_assiduites_justificatifs(formsemestre: FormSemestre):
""" """
Ajoute des assiduités semi-aléatoires à chaque étudiant du semestre Ajoute des assiduités semi-aléatoires à chaque étudiant du semestre
""" """
MODS = [moduleimpl for moduleimpl in formsemestre.modimpls] MODS = [moduleimpl for moduleimpl in formsemestre.modimpls]
MODS.append(None) MODS.append(None)
from app.scodoc.sco_utils import localize_datetime
for etud in formsemestre.etuds: for etud in formsemestre.etuds:
@ -408,6 +408,26 @@ def ajouter_assiduites(formsemestre: FormSemestre):
db.session.add(code) db.session.add(code)
for i in range(random.randint(0, 2)):
etat = random.randint(0, 3)
deb_date = base_date + datetime.timedelta(days=i)
fin_date = deb_date + datetime.timedelta(hours=8)
raison = random.choice(["raison", None])
code = Justificatif.create_justificatif(
etud=etud,
date_debut=deb_date,
date_fin=fin_date,
etat=etat,
raison=raison,
)
assert isinstance(
code, Justificatif
), "Erreur dans la génération des justificatifs"
db.session.add(code)
db.session.commit() db.session.commit()
@ -431,7 +451,7 @@ def init_test_database():
saisie_notes_evaluations(formsemestre, user_lecteur) saisie_notes_evaluations(formsemestre, user_lecteur)
add_absences(formsemestre) add_absences(formsemestre)
create_etape_apo(formsemestre) create_etape_apo(formsemestre)
ajouter_assiduites(formsemestre) ajouter_assiduites_justificatifs(formsemestre)
create_logos() create_logos()
# à compléter # à compléter
# - groupes # - groupes