Assiduites : Fonctionnement BackEnd + API
This commit is contained in:
parent
c5fb15fbe8
commit
9a0852917f
@ -2,7 +2,8 @@
|
||||
"""
|
||||
|
||||
from flask import Blueprint
|
||||
from flask import request
|
||||
from flask import request, g, jsonify
|
||||
from app import db
|
||||
from app.scodoc import sco_utils as scu
|
||||
from app.scodoc.sco_exceptions import ScoException
|
||||
|
||||
@ -34,9 +35,26 @@ def requested_format(default_format="json", allowed_formats=None):
|
||||
return None
|
||||
|
||||
|
||||
def get_model_api_object(model_cls: db.Model, model_id: int, join_cls: db.Model = None):
|
||||
"""
|
||||
Retourne une réponse contenant la représentation api de l'objet "Model[model_id]"
|
||||
|
||||
Filtrage du département en fonction d'une classe de jointure (eg: Identite, Formsemstre) -> join_cls
|
||||
|
||||
exemple d'utilisation : fonction "justificatif()" -> app/api/justificatifs.py
|
||||
"""
|
||||
query = model_cls.query.filter_by(id=model_id)
|
||||
if g.scodoc_dept and join_cls is not None:
|
||||
query = query.join(join_cls).filter_by(dept_id=g.scodoc_dept_id)
|
||||
unique: model_cls = query.first_or_404()
|
||||
|
||||
return jsonify(unique.to_dict(format_api=True))
|
||||
|
||||
|
||||
from app.api import tokens
|
||||
from app.api import (
|
||||
absences,
|
||||
assiduites,
|
||||
billets_absences,
|
||||
departements,
|
||||
etudiants,
|
||||
@ -44,6 +62,7 @@ from app.api import (
|
||||
formations,
|
||||
formsemestres,
|
||||
jury,
|
||||
justificatifs,
|
||||
logos,
|
||||
partitions,
|
||||
semset,
|
||||
|
868
app/api/assiduites.py
Normal file
868
app/api/assiduites.py
Normal file
@ -0,0 +1,868 @@
|
||||
##############################################################################
|
||||
# ScoDoc
|
||||
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
|
||||
# See LICENSE
|
||||
##############################################################################
|
||||
"""ScoDoc 9 API : Assiduités
|
||||
"""
|
||||
from datetime import datetime
|
||||
from flask import g, jsonify, request
|
||||
from flask_login import login_required, current_user
|
||||
|
||||
import app.scodoc.sco_assiduites as scass
|
||||
import app.scodoc.sco_utils as scu
|
||||
from app import db
|
||||
from app.api import api_bp as bp
|
||||
from app.api import api_web_bp
|
||||
from app.api import get_model_api_object
|
||||
from app.decorators import permission_required, scodoc
|
||||
from app.models import Assiduite, FormSemestre, Identite, ModuleImpl
|
||||
from app.scodoc.sco_exceptions import ScoValueError
|
||||
from app.scodoc.sco_permissions import Permission
|
||||
from app.scodoc.sco_utils import json_error
|
||||
|
||||
|
||||
@bp.route("/assiduite/<int:assiduite_id>")
|
||||
@api_web_bp.route("/assiduite/<int:assiduite_id>")
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def assiduite(assiduite_id: int = None):
|
||||
"""Retourne un objet assiduité à partir de son id
|
||||
|
||||
Exemple de résultat:
|
||||
{
|
||||
"assiduite_id": 1,
|
||||
"etudid": 2,
|
||||
"moduleimpl_id": 3,
|
||||
"date_debut": "2022-10-31T08:00+01:00",
|
||||
"date_fin": "2022-10-31T10:00+01:00",
|
||||
"etat": "retard",
|
||||
"desc": "une description",
|
||||
"user_id: 1 or null,
|
||||
"est_just": False or True,
|
||||
}
|
||||
"""
|
||||
|
||||
return get_model_api_object(Assiduite, assiduite_id, Identite)
|
||||
|
||||
|
||||
@bp.route("/assiduites/<int:etudid>/count", defaults={"with_query": False})
|
||||
@bp.route("/assiduites/<int:etudid>/count/query", defaults={"with_query": True})
|
||||
@api_web_bp.route("/assiduites/<int:etudid>/count", defaults={"with_query": False})
|
||||
@api_web_bp.route("/assiduites/<int:etudid>/count/query", defaults={"with_query": True})
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def count_assiduites(etudid: int = None, with_query: bool = False):
|
||||
"""
|
||||
Retourne le nombre d'assiduités d'un étudiant
|
||||
chemin : /assiduites/<int:etudid>/count
|
||||
|
||||
Un filtrage peut être donné avec une query
|
||||
chemin : /assiduites/<int:etudid>/count/query?
|
||||
|
||||
Les différents filtres :
|
||||
Type (type de comptage -> journee, demi, heure, nombre d'assiduite):
|
||||
query?type=(journee, demi, heure) -> une seule valeur parmis les trois
|
||||
ex: .../query?type=heure
|
||||
Comportement par défaut : compte le nombre d'assiduité enregistrée
|
||||
|
||||
Etat (etat de l'étudiant -> absent, present ou retard):
|
||||
query?etat=[- liste des états séparé par une virgule -]
|
||||
ex: .../query?etat=present,retard
|
||||
Date debut
|
||||
(date de début de l'assiduité, sont affichés les assiduités
|
||||
dont la date de début est supérieur ou égale à la valeur donnée):
|
||||
query?date_debut=[- date au format iso -]
|
||||
ex: query?date_debut=2022-11-03T08:00+01:00
|
||||
Date fin
|
||||
(date de fin de l'assiduité, sont affichés les assiduités
|
||||
dont la date de fin est inférieure ou égale à la valeur donnée):
|
||||
query?date_fin=[- date au format iso -]
|
||||
ex: query?date_fin=2022-11-03T10:00+01:00
|
||||
Moduleimpl_id (l'id du module concerné par l'assiduité):
|
||||
query?moduleimpl_id=[- int ou vide -]
|
||||
ex: query?moduleimpl_id=1234
|
||||
query?moduleimpl_od=
|
||||
Formsemstre_id (l'id du formsemestre concerné par l'assiduité)
|
||||
query?formsemestre_id=[int]
|
||||
ex query?formsemestre_id=3
|
||||
user_id (l'id de l'auteur de l'assiduité)
|
||||
query?user_id=[int]
|
||||
ex query?user_id=3
|
||||
est_just (si l'assiduité est justifié (fait aussi filtre par abs/retard))
|
||||
query?est_just=[bool]
|
||||
query?est_just=f
|
||||
query?est_just=t
|
||||
|
||||
|
||||
|
||||
"""
|
||||
query = Identite.query.filter_by(id=etudid)
|
||||
if g.scodoc_dept:
|
||||
query = query.filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
etud: Identite = query.first_or_404(etudid)
|
||||
filtered: dict[str, object] = {}
|
||||
metric: str = "all"
|
||||
|
||||
if with_query:
|
||||
metric, filtered = _count_manager(request)
|
||||
|
||||
return jsonify(
|
||||
scass.get_assiduites_stats(
|
||||
assiduites=etud.assiduites, metric=metric, filtered=filtered
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/assiduites/<int:etudid>", defaults={"with_query": False})
|
||||
@bp.route("/assiduites/<int:etudid>/query", defaults={"with_query": True})
|
||||
@api_web_bp.route("/assiduites/<int:etudid>", defaults={"with_query": False})
|
||||
@api_web_bp.route("/assiduites/<int:etudid>/query", defaults={"with_query": True})
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def assiduites(etudid: int = None, with_query: bool = False):
|
||||
"""
|
||||
Retourne toutes les assiduités d'un étudiant
|
||||
chemin : /assiduites/<int:etudid>
|
||||
|
||||
Un filtrage peut être donné avec une query
|
||||
chemin : /assiduites/<int:etudid>/query?
|
||||
|
||||
Les différents filtres :
|
||||
Etat (etat de l'étudiant -> absent, present ou retard):
|
||||
query?etat=[- liste des états séparé par une virgule -]
|
||||
ex: .../query?etat=present,retard
|
||||
Date debut
|
||||
(date de début de l'assiduité, sont affichés les assiduités
|
||||
dont la date de début est supérieur ou égale à la valeur donnée):
|
||||
query?date_debut=[- date au format iso -]
|
||||
ex: query?date_debut=2022-11-03T08:00+01:00
|
||||
Date fin
|
||||
(date de fin de l'assiduité, sont affichés les assiduités
|
||||
dont la date de fin est inférieure ou égale à la valeur donnée):
|
||||
query?date_fin=[- date au format iso -]
|
||||
ex: query?date_fin=2022-11-03T10:00+01:00
|
||||
Moduleimpl_id (l'id du module concerné par l'assiduité):
|
||||
query?moduleimpl_id=[- int ou vide -]
|
||||
ex: query?moduleimpl_id=1234
|
||||
query?moduleimpl_od=
|
||||
Formsemstre_id (l'id du formsemestre concerné par l'assiduité)
|
||||
query?formsemstre_id=[int]
|
||||
ex query?formsemestre_id=3
|
||||
user_id (l'id de l'auteur de l'assiduité)
|
||||
query?user_id=[int]
|
||||
ex query?user_id=3
|
||||
est_just (si l'assiduité est justifié (fait aussi filtre par abs/retard))
|
||||
query?est_just=[bool]
|
||||
query?est_just=f
|
||||
query?est_just=t
|
||||
|
||||
|
||||
"""
|
||||
|
||||
query = Identite.query.filter_by(id=etudid)
|
||||
if g.scodoc_dept:
|
||||
query = query.filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
etud: Identite = query.first_or_404(etudid)
|
||||
assiduites_query = etud.assiduites
|
||||
|
||||
if with_query:
|
||||
assiduites_query = _filter_manager(request, assiduites_query)
|
||||
|
||||
data_set: list[dict] = []
|
||||
for ass in assiduites_query.all():
|
||||
data = ass.to_dict(format_api=True)
|
||||
data_set.append(data)
|
||||
|
||||
return jsonify(data_set)
|
||||
|
||||
|
||||
@bp.route("/assiduites/group/query", defaults={"with_query": True})
|
||||
@api_web_bp.route("/assiduites/group/query", defaults={"with_query": True})
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def assiduites_group(with_query: bool = False):
|
||||
"""
|
||||
Retourne toutes les assiduités d'un groupe d'étudiants
|
||||
chemin : /assiduites/group/query?etudids=1,2,3
|
||||
|
||||
Un filtrage peut être donné avec une query
|
||||
chemin : /assiduites/group/query?etudids=1,2,3
|
||||
|
||||
Les différents filtres :
|
||||
Etat (etat de l'étudiant -> absent, present ou retard):
|
||||
query?etat=[- liste des états séparé par une virgule -]
|
||||
ex: .../query?etat=present,retard
|
||||
Date debut
|
||||
(date de début de l'assiduité, sont affichés les assiduités
|
||||
dont la date de début est supérieur ou égale à la valeur donnée):
|
||||
query?date_debut=[- date au format iso -]
|
||||
ex: query?date_debut=2022-11-03T08:00+01:00
|
||||
Date fin
|
||||
(date de fin de l'assiduité, sont affichés les assiduités
|
||||
dont la date de fin est inférieure ou égale à la valeur donnée):
|
||||
query?date_fin=[- date au format iso -]
|
||||
ex: query?date_fin=2022-11-03T10:00+01:00
|
||||
Moduleimpl_id (l'id du module concerné par l'assiduité):
|
||||
query?moduleimpl_id=[- int ou vide -]
|
||||
ex: query?moduleimpl_id=1234
|
||||
query?moduleimpl_od=
|
||||
Formsemstre_id (l'id du formsemestre concerné par l'assiduité)
|
||||
query?formsemstre_id=[int]
|
||||
ex query?formsemestre_id=3
|
||||
user_id (l'id de l'auteur de l'assiduité)
|
||||
query?user_id=[int]
|
||||
ex query?user_id=3
|
||||
est_just (si l'assiduité est justifié (fait aussi filtre par abs/retard))
|
||||
query?est_just=[bool]
|
||||
query?est_just=f
|
||||
query?est_just=t
|
||||
|
||||
|
||||
"""
|
||||
|
||||
etuds = request.args.get("etudids", "")
|
||||
etuds = etuds.split(",")
|
||||
try:
|
||||
etuds = [int(etu) for etu in etuds]
|
||||
except ValueError:
|
||||
return json_error(404, "Le champs etudids n'est pas correctement formé")
|
||||
|
||||
query = Identite.query.filter(Identite.id.in_(etuds))
|
||||
if g.scodoc_dept:
|
||||
query = query.filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
if len(etuds) != query.count() or len(etuds) == 0:
|
||||
return json_error(
|
||||
404,
|
||||
"Tous les étudiants ne sont pas dans le même département et/ou n'existe pas.",
|
||||
)
|
||||
assiduites_query = Assiduite.query.filter(Assiduite.etudid.in_(etuds))
|
||||
|
||||
if with_query:
|
||||
assiduites_query = _filter_manager(request, assiduites_query)
|
||||
|
||||
data_set: dict[list[dict]] = {key: [] for key in etuds}
|
||||
for ass in assiduites_query.all():
|
||||
data = ass.to_dict(format_api=True)
|
||||
data_set.get(data["etudid"]).append(data)
|
||||
|
||||
return jsonify(data_set)
|
||||
|
||||
|
||||
@bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>", defaults={"with_query": False}
|
||||
)
|
||||
@api_web_bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>", defaults={"with_query": False}
|
||||
)
|
||||
@bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>/query",
|
||||
defaults={"with_query": True},
|
||||
)
|
||||
@api_web_bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>/query",
|
||||
defaults={"with_query": True},
|
||||
)
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def assiduites_formsemestre(formsemestre_id: int, with_query: bool = False):
|
||||
"""Retourne toutes les assiduités du formsemestre"""
|
||||
formsemestre: FormSemestre = None
|
||||
formsemestre_id = int(formsemestre_id)
|
||||
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
|
||||
|
||||
if formsemestre is None:
|
||||
return json_error(404, "le paramètre 'formsemestre_id' n'existe pas")
|
||||
|
||||
assiduites_query = scass.filter_by_formsemestre(Assiduite.query, formsemestre)
|
||||
|
||||
if with_query:
|
||||
assiduites_query = _filter_manager(request, assiduites_query)
|
||||
|
||||
data_set: list[dict] = []
|
||||
for ass in assiduites_query.all():
|
||||
data = ass.to_dict(format_api=True)
|
||||
data_set.append(data)
|
||||
|
||||
return jsonify(data_set)
|
||||
|
||||
|
||||
@bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>/count",
|
||||
defaults={"with_query": False},
|
||||
)
|
||||
@api_web_bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>/count",
|
||||
defaults={"with_query": False},
|
||||
)
|
||||
@bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>/count/query",
|
||||
defaults={"with_query": True},
|
||||
)
|
||||
@api_web_bp.route(
|
||||
"/assiduites/formsemestre/<int:formsemestre_id>/count/query",
|
||||
defaults={"with_query": True},
|
||||
)
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def count_assiduites_formsemestre(
|
||||
formsemestre_id: int = None, with_query: bool = False
|
||||
):
|
||||
"""Comptage des assiduités du formsemestre"""
|
||||
formsemestre: FormSemestre = None
|
||||
formsemestre_id = int(formsemestre_id)
|
||||
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
|
||||
|
||||
if formsemestre is None:
|
||||
return json_error(404, "le paramètre 'formsemestre_id' n'existe pas")
|
||||
|
||||
etuds = formsemestre.etuds.all()
|
||||
etuds_id = [etud.id for etud in etuds]
|
||||
|
||||
assiduites_query = Assiduite.query.filter(Assiduite.etudid.in_(etuds_id))
|
||||
assiduites_query = scass.filter_by_formsemestre(assiduites_query, formsemestre)
|
||||
metric: str = "all"
|
||||
filtered: dict = {}
|
||||
if with_query:
|
||||
metric, filtered = _count_manager(request)
|
||||
|
||||
return jsonify(scass.get_assiduites_stats(assiduites_query, metric, filtered))
|
||||
|
||||
|
||||
@bp.route("/assiduite/<int:etudid>/create", methods=["POST"])
|
||||
@api_web_bp.route("/assiduite/<int:etudid>/create", methods=["POST"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def assiduite_create(etudid: int = None):
|
||||
"""
|
||||
Création d'une assiduité pour l'étudiant (etudid)
|
||||
La requête doit avoir un content type "application/json":
|
||||
[
|
||||
{
|
||||
"date_debut": str,
|
||||
"date_fin": str,
|
||||
"etat": str,
|
||||
},
|
||||
{
|
||||
"date_debut": str,
|
||||
"date_fin": str,
|
||||
"etat": str,
|
||||
"moduleimpl_id": int,
|
||||
"desc":str,
|
||||
}
|
||||
...
|
||||
]
|
||||
|
||||
"""
|
||||
etud: Identite = Identite.query.filter_by(id=etudid).first_or_404()
|
||||
|
||||
create_list: list[object] = request.get_json(force=True)
|
||||
|
||||
if not isinstance(create_list, list):
|
||||
return json_error(404, "Le contenu envoyé n'est pas une liste")
|
||||
|
||||
errors: dict[int, str] = {}
|
||||
success: dict[int, object] = {}
|
||||
for i, data in enumerate(create_list):
|
||||
code, obj = _create_singular(data, etud)
|
||||
if code == 404:
|
||||
errors[i] = obj
|
||||
else:
|
||||
success[i] = obj
|
||||
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({"errors": errors, "success": success})
|
||||
|
||||
|
||||
@bp.route("/assiduites/create", methods=["POST"])
|
||||
@api_web_bp.route("/assiduites/create", methods=["POST"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def assiduites_create():
|
||||
"""
|
||||
Création d'une assiduité ou plusieurs assiduites
|
||||
La requête doit avoir un content type "application/json":
|
||||
[
|
||||
{
|
||||
"date_debut": str,
|
||||
"date_fin": str,
|
||||
"etat": str,
|
||||
"etudid":int,
|
||||
},
|
||||
{
|
||||
"date_debut": str,
|
||||
"date_fin": str,
|
||||
"etat": str,
|
||||
"etudid":int,
|
||||
|
||||
"moduleimpl_id": int,
|
||||
"desc":str,
|
||||
}
|
||||
...
|
||||
]
|
||||
|
||||
"""
|
||||
|
||||
create_list: list[object] = request.get_json(force=True)
|
||||
|
||||
if not isinstance(create_list, list):
|
||||
return json_error(404, "Le contenu envoyé n'est pas une liste")
|
||||
|
||||
errors: dict[int, str] = {}
|
||||
success: dict[int, object] = {}
|
||||
for i, data in enumerate(create_list):
|
||||
etud: Identite = Identite.query.filter_by(id=data["etudid"]).first()
|
||||
if etud is None:
|
||||
errors[i] = "Cet étudiant n'existe pas."
|
||||
continue
|
||||
|
||||
code, obj = _create_singular(data, etud)
|
||||
if code == 404:
|
||||
errors[i] = obj
|
||||
else:
|
||||
success[i] = obj
|
||||
|
||||
return jsonify({"errors": errors, "success": success})
|
||||
|
||||
|
||||
def _create_singular(
|
||||
data: dict,
|
||||
etud: Identite,
|
||||
) -> tuple[int, object]:
|
||||
errors: list[str] = []
|
||||
|
||||
# -- vérifications de l'objet json --
|
||||
# cas 1 : ETAT
|
||||
etat = data.get("etat", None)
|
||||
if etat is None:
|
||||
errors.append("param 'etat': manquant")
|
||||
elif not scu.EtatAssiduite.contains(etat):
|
||||
errors.append("param 'etat': invalide")
|
||||
|
||||
etat = scu.EtatAssiduite.get(etat)
|
||||
|
||||
# cas 2 : date_debut
|
||||
date_debut = data.get("date_debut", None)
|
||||
if date_debut is None:
|
||||
errors.append("param 'date_debut': manquant")
|
||||
deb = scu.is_iso_formated(date_debut, convert=True)
|
||||
if deb is None:
|
||||
errors.append("param 'date_debut': format invalide")
|
||||
|
||||
# cas 3 : date_fin
|
||||
date_fin = data.get("date_fin", None)
|
||||
if date_fin is None:
|
||||
errors.append("param 'date_fin': manquant")
|
||||
fin = scu.is_iso_formated(date_fin, convert=True)
|
||||
if fin is None:
|
||||
errors.append("param 'date_fin': format invalide")
|
||||
|
||||
# cas 4 : moduleimpl_id
|
||||
|
||||
moduleimpl_id = data.get("moduleimpl_id", False)
|
||||
moduleimpl: ModuleImpl = None
|
||||
|
||||
if moduleimpl_id is not False:
|
||||
moduleimpl = ModuleImpl.query.filter_by(id=int(moduleimpl_id)).first()
|
||||
if moduleimpl is None:
|
||||
errors.append("param 'moduleimpl_id': invalide")
|
||||
|
||||
# cas 5 : desc
|
||||
|
||||
desc: str = data.get("desc", None)
|
||||
|
||||
if errors:
|
||||
err: str = ", ".join(errors)
|
||||
return (404, err)
|
||||
|
||||
# TOUT EST OK
|
||||
try:
|
||||
nouv_assiduite: Assiduite = Assiduite.create_assiduite(
|
||||
date_debut=deb,
|
||||
date_fin=fin,
|
||||
etat=etat,
|
||||
etud=etud,
|
||||
moduleimpl=moduleimpl,
|
||||
description=desc,
|
||||
user_id=current_user.id,
|
||||
)
|
||||
|
||||
db.session.add(nouv_assiduite)
|
||||
db.session.commit()
|
||||
|
||||
return (200, {"assiduite_id": nouv_assiduite.id})
|
||||
except ScoValueError as excp:
|
||||
return (
|
||||
404,
|
||||
excp.args[0],
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/assiduite/delete", methods=["POST"])
|
||||
@api_web_bp.route("/assiduite/delete", methods=["POST"])
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def assiduite_delete():
|
||||
"""
|
||||
Suppression d'une assiduité à partir de son id
|
||||
|
||||
Forme des données envoyées :
|
||||
|
||||
[
|
||||
<assiduite_id:int>,
|
||||
...
|
||||
]
|
||||
|
||||
|
||||
"""
|
||||
assiduites_list: list[int] = request.get_json(force=True)
|
||||
if not isinstance(assiduites_list, list):
|
||||
return json_error(404, "Le contenu envoyé n'est pas une liste")
|
||||
|
||||
output = {"errors": {}, "success": {}}
|
||||
|
||||
for i, ass in enumerate(assiduites_list):
|
||||
code, msg = _delete_singular(ass, db)
|
||||
if code == 404:
|
||||
output["errors"][f"{i}"] = msg
|
||||
else:
|
||||
output["success"][f"{i}"] = {"OK": True}
|
||||
db.session.commit()
|
||||
return jsonify(output)
|
||||
|
||||
|
||||
def _delete_singular(assiduite_id: int, database):
|
||||
assiduite_unique: Assiduite = Assiduite.query.filter_by(id=assiduite_id).first()
|
||||
if assiduite_unique is None:
|
||||
return (404, "Assiduite non existante")
|
||||
database.session.delete(assiduite_unique)
|
||||
return (200, "OK")
|
||||
|
||||
|
||||
@bp.route("/assiduite/<int:assiduite_id>/edit", methods=["POST"])
|
||||
@api_web_bp.route("/assiduite/<int:assiduite_id>/edit", methods=["POST"])
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def assiduite_edit(assiduite_id: int):
|
||||
"""
|
||||
Edition d'une assiduité à partir de son id
|
||||
La requête doit avoir un content type "application/json":
|
||||
{
|
||||
"etat"?: str,
|
||||
"moduleimpl_id"?: int
|
||||
"desc"?: str
|
||||
"est_just"?: bool
|
||||
}
|
||||
"""
|
||||
assiduite_unique: Assiduite = Assiduite.query.filter_by(
|
||||
id=assiduite_id
|
||||
).first_or_404()
|
||||
errors: list[str] = []
|
||||
data = request.get_json(force=True)
|
||||
|
||||
# Vérifications de data
|
||||
|
||||
# Cas 1 : Etat
|
||||
if data.get("etat") is not None:
|
||||
etat = scu.EtatAssiduite.get(data.get("etat"))
|
||||
if etat is None:
|
||||
errors.append("param 'etat': invalide")
|
||||
else:
|
||||
assiduite_unique.etat = etat
|
||||
|
||||
# Cas 2 : Moduleimpl_id
|
||||
moduleimpl_id = data.get("moduleimpl_id", False)
|
||||
moduleimpl: ModuleImpl = None
|
||||
|
||||
if moduleimpl_id is not False:
|
||||
if moduleimpl_id is not None:
|
||||
moduleimpl = ModuleImpl.query.filter_by(id=int(moduleimpl_id)).first()
|
||||
if moduleimpl is None:
|
||||
errors.append("param 'moduleimpl_id': invalide")
|
||||
else:
|
||||
if not moduleimpl.est_inscrit(
|
||||
Identite.query.filter_by(id=assiduite_unique.etudid).first()
|
||||
):
|
||||
errors.append("param 'moduleimpl_id': etud non inscrit")
|
||||
else:
|
||||
assiduite_unique.moduleimpl_id = moduleimpl_id
|
||||
else:
|
||||
assiduite_unique.moduleimpl_id = moduleimpl_id
|
||||
|
||||
# Cas 3 : desc
|
||||
desc = data.get("desc", False)
|
||||
if desc is not False:
|
||||
assiduite_unique.desc = desc
|
||||
|
||||
# Cas 4 : est_just
|
||||
est_just = data.get("est_just")
|
||||
if est_just is not None:
|
||||
if not isinstance(est_just, bool):
|
||||
errors.append("param 'est_just' : booléen non reconnu")
|
||||
else:
|
||||
assiduite_unique.est_just = est_just
|
||||
|
||||
if errors:
|
||||
err: str = ", ".join(errors)
|
||||
return json_error(404, err)
|
||||
|
||||
db.session.add(assiduite_unique)
|
||||
db.session.commit()
|
||||
return jsonify({"OK": True})
|
||||
|
||||
|
||||
@bp.route("/assiduites/edit", methods=["POST"])
|
||||
@api_web_bp.route("/assiduites/edit", methods=["POST"])
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def assiduites_edit():
|
||||
"""
|
||||
Edition d'une assiduité à partir de son id
|
||||
La requête doit avoir un content type "application/json":
|
||||
{
|
||||
"etat"?: str,
|
||||
"moduleimpl_id"?: int
|
||||
"desc"?: str
|
||||
"est_just"?: bool
|
||||
}
|
||||
"""
|
||||
edit_list: list[object] = request.get_json(force=True)
|
||||
|
||||
if not isinstance(edit_list, list):
|
||||
return json_error(404, "Le contenu envoyé n'est pas une liste")
|
||||
|
||||
errors: dict[int, str] = {}
|
||||
success: dict[int, object] = {}
|
||||
for i, data in enumerate(edit_list):
|
||||
assi: Identite = Assiduite.query.filter_by(id=data["assiduite_id"]).first()
|
||||
if assi is None:
|
||||
errors[i] = "Cet assiduité n'existe pas."
|
||||
continue
|
||||
|
||||
code, obj = _edit_singular(assi, data)
|
||||
if code == 404:
|
||||
errors[i] = obj
|
||||
else:
|
||||
success[i] = obj
|
||||
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({"errors": errors, "success": success})
|
||||
|
||||
|
||||
def _edit_singular(assiduite_unique, data):
|
||||
errors: list[str] = []
|
||||
|
||||
# Vérifications de data
|
||||
|
||||
# Cas 1 : Etat
|
||||
if data.get("etat") is not None:
|
||||
etat = scu.EtatAssiduite.get(data.get("etat"))
|
||||
if etat is None:
|
||||
errors.append("param 'etat': invalide")
|
||||
else:
|
||||
assiduite_unique.etat = etat
|
||||
|
||||
# Cas 2 : Moduleimpl_id
|
||||
moduleimpl_id = data.get("moduleimpl_id", False)
|
||||
moduleimpl: ModuleImpl = None
|
||||
|
||||
if moduleimpl_id is not False:
|
||||
if moduleimpl_id is not None:
|
||||
moduleimpl = ModuleImpl.query.filter_by(id=int(moduleimpl_id)).first()
|
||||
if moduleimpl is None:
|
||||
errors.append("param 'moduleimpl_id': invalide")
|
||||
else:
|
||||
if not moduleimpl.est_inscrit(
|
||||
Identite.query.filter_by(id=assiduite_unique.etudid).first()
|
||||
):
|
||||
errors.append("param 'moduleimpl_id': etud non inscrit")
|
||||
else:
|
||||
assiduite_unique.moduleimpl_id = moduleimpl_id
|
||||
else:
|
||||
assiduite_unique.moduleimpl_id = moduleimpl_id
|
||||
|
||||
# Cas 3 : desc
|
||||
desc = data.get("desc", False)
|
||||
if desc is not False:
|
||||
assiduite_unique.desc = desc
|
||||
|
||||
# Cas 4 : est_just
|
||||
est_just = data.get("est_just")
|
||||
if est_just is not None:
|
||||
if not isinstance(est_just, bool):
|
||||
errors.append("param 'est_just' : booléen non reconnu")
|
||||
else:
|
||||
assiduite_unique.est_just = est_just
|
||||
|
||||
if errors:
|
||||
err: str = ", ".join(errors)
|
||||
return (404, err)
|
||||
|
||||
db.session.add(assiduite_unique)
|
||||
|
||||
return (200, "OK")
|
||||
|
||||
|
||||
# -- Utils --
|
||||
|
||||
|
||||
def _count_manager(requested) -> tuple[str, dict]:
|
||||
"""
|
||||
Retourne la/les métriques à utiliser ainsi que le filtre donnés en query de la requête
|
||||
"""
|
||||
filtered: dict = {}
|
||||
# cas 1 : etat assiduite
|
||||
etat = requested.args.get("etat")
|
||||
if etat is not None:
|
||||
filtered["etat"] = etat
|
||||
|
||||
# cas 2 : date de début
|
||||
deb = requested.args.get("date_debut", "").replace(" ", "+")
|
||||
deb: datetime = scu.is_iso_formated(deb, True)
|
||||
if deb is not None:
|
||||
filtered["date_debut"] = deb
|
||||
|
||||
# cas 3 : date de fin
|
||||
fin = requested.args.get("date_fin", "").replace(" ", "+")
|
||||
fin = scu.is_iso_formated(fin, True)
|
||||
|
||||
if fin is not None:
|
||||
filtered["date_fin"] = fin
|
||||
|
||||
# cas 4 : moduleimpl_id
|
||||
module = requested.args.get("moduleimpl_id", False)
|
||||
try:
|
||||
if module is False:
|
||||
raise ValueError
|
||||
if module != "":
|
||||
module = int(module)
|
||||
else:
|
||||
module = None
|
||||
except ValueError:
|
||||
module = False
|
||||
|
||||
if module is not False:
|
||||
filtered["moduleimpl_id"] = module
|
||||
|
||||
# cas 5 : formsemestre_id
|
||||
formsemestre_id = requested.args.get("formsemestre_id")
|
||||
|
||||
if formsemestre_id is not None:
|
||||
formsemestre: FormSemestre = None
|
||||
formsemestre_id = int(formsemestre_id)
|
||||
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
|
||||
filtered["formsemestre"] = formsemestre
|
||||
|
||||
# cas 6 : type
|
||||
metric = requested.args.get("metric", "all")
|
||||
|
||||
# cas 7 : est_just
|
||||
|
||||
est_just: str = requested.args.get("est_just")
|
||||
if est_just is not None:
|
||||
trues: tuple[str] = ("v", "t", "vrai", "true")
|
||||
falses: tuple[str] = ("f", "faux", "false")
|
||||
|
||||
if est_just.lower() in trues:
|
||||
filtered["est_just"] = True
|
||||
elif est_just.lower() in falses:
|
||||
filtered["est_just"] = False
|
||||
|
||||
# cas 8 : user_id
|
||||
|
||||
user_id = requested.args.get("user_id", False)
|
||||
if user_id is not False:
|
||||
filtered["user_id"] = user_id
|
||||
|
||||
return (metric, filtered)
|
||||
|
||||
|
||||
def _filter_manager(requested, assiduites_query: Assiduite):
|
||||
"""
|
||||
Retourne les assiduites entrées filtrées en fonction de la request
|
||||
"""
|
||||
# cas 1 : etat assiduite
|
||||
etat = requested.args.get("etat")
|
||||
if etat is not None:
|
||||
assiduites_query = scass.filter_assiduites_by_etat(assiduites_query, etat)
|
||||
|
||||
# cas 2 : date de début
|
||||
deb = requested.args.get("date_debut", "").replace(" ", "+")
|
||||
deb: datetime = scu.is_iso_formated(deb, True)
|
||||
|
||||
# cas 3 : date de fin
|
||||
fin = requested.args.get("date_fin", "").replace(" ", "+")
|
||||
fin = scu.is_iso_formated(fin, True)
|
||||
|
||||
if (deb, fin) != (None, None):
|
||||
assiduites_query: Assiduite = scass.filter_by_date(
|
||||
assiduites_query, Assiduite, deb, fin
|
||||
)
|
||||
|
||||
# cas 4 : moduleimpl_id
|
||||
module = requested.args.get("moduleimpl_id", False)
|
||||
try:
|
||||
if module is False:
|
||||
raise ValueError
|
||||
if module != "":
|
||||
module = int(module)
|
||||
else:
|
||||
module = None
|
||||
except ValueError:
|
||||
module = False
|
||||
|
||||
if module is not False:
|
||||
assiduites_query = scass.filter_by_module_impl(assiduites_query, module)
|
||||
|
||||
# cas 5 : formsemestre_id
|
||||
formsemestre_id = requested.args.get("formsemestre_id")
|
||||
|
||||
if formsemestre_id is not None:
|
||||
formsemestre: FormSemestre = None
|
||||
formsemestre_id = int(formsemestre_id)
|
||||
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
|
||||
assiduites_query = scass.filter_by_formsemestre(assiduites_query, formsemestre)
|
||||
|
||||
# cas 6 : est_just
|
||||
|
||||
est_just: str = requested.args.get("est_just")
|
||||
if est_just is not None:
|
||||
trues: tuple[str] = ("v", "t", "vrai", "true")
|
||||
falses: tuple[str] = ("f", "faux", "false")
|
||||
|
||||
if est_just.lower() in trues:
|
||||
assiduites_query: Assiduite = scass.filter_assiduites_by_est_just(
|
||||
assiduites_query, True
|
||||
)
|
||||
elif est_just.lower() in falses:
|
||||
assiduites_query: Assiduite = scass.filter_assiduites_by_est_just(
|
||||
assiduites_query, False
|
||||
)
|
||||
|
||||
# cas 8 : user_id
|
||||
|
||||
user_id = requested.args.get("user_id", False)
|
||||
if user_id is not False:
|
||||
assiduites_query: Assiduite = scass.filter_by_user_id(assiduites_query, user_id)
|
||||
|
||||
return assiduites_query
|
37
app/api/etudiants.py
Normal file → Executable file
37
app/api/etudiants.py
Normal file → Executable file
@ -34,6 +34,7 @@ from app.scodoc.sco_bulletins import do_formsemestre_bulletinetud
|
||||
from app.scodoc.sco_permissions import Permission
|
||||
from app.scodoc.sco_utils import json_error, suppress_accents
|
||||
|
||||
import app.scodoc.sco_photos as sco_photos
|
||||
|
||||
# Un exemple:
|
||||
# @bp.route("/api_function/<int:arg>")
|
||||
@ -136,6 +137,42 @@ def etudiant(etudid: int = None, nip: str = None, ine: str = None):
|
||||
return etud.to_dict_api()
|
||||
|
||||
|
||||
@api_web_bp.route("/etudiant/etudid/<int:etudid>/photo")
|
||||
@api_web_bp.route("/etudiant/nip/<string:nip>/photo")
|
||||
@api_web_bp.route("/etudiant/ine/<string:ine>/photo")
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def get_photo_image(etudid: int = None, nip: str = None, ine: str = None):
|
||||
"""
|
||||
Retourne la photo de l'étudiant
|
||||
correspondant ou un placeholder si non existant.
|
||||
|
||||
etudid : l'etudid de l'étudiant
|
||||
nip : le code nip de l'étudiant
|
||||
ine : le code ine de l'étudiant
|
||||
|
||||
Attention : Ne peut être qu'utilisée en tant que route de département
|
||||
"""
|
||||
|
||||
etud = tools.get_etud(etudid, nip, ine)
|
||||
|
||||
if etud is None:
|
||||
return json_error(
|
||||
404,
|
||||
message="étudiant inconnu",
|
||||
)
|
||||
if not etudid:
|
||||
filename = sco_photos.UNKNOWN_IMAGE_PATH
|
||||
|
||||
size = request.args.get("size", "orig")
|
||||
filename = sco_photos.photo_pathname(etud.photo_filename, size=size)
|
||||
if not filename:
|
||||
filename = sco_photos.UNKNOWN_IMAGE_PATH
|
||||
res = sco_photos.build_image_response(filename)
|
||||
return res
|
||||
|
||||
|
||||
@bp.route("/etudiants/etudid/<int:etudid>", methods=["GET"])
|
||||
@bp.route("/etudiants/nip/<string:nip>", methods=["GET"])
|
||||
@bp.route("/etudiants/ine/<string:ine>", methods=["GET"])
|
||||
|
591
app/api/justificatifs.py
Normal file
591
app/api/justificatifs.py
Normal file
@ -0,0 +1,591 @@
|
||||
##############################################################################
|
||||
# ScoDoc
|
||||
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
|
||||
# See LICENSE
|
||||
##############################################################################
|
||||
"""ScoDoc 9 API : Assiduités
|
||||
"""
|
||||
from datetime import datetime
|
||||
|
||||
from flask import g, jsonify, request
|
||||
from flask_login import login_required, current_user
|
||||
|
||||
import app.scodoc.sco_assiduites as scass
|
||||
import app.scodoc.sco_utils as scu
|
||||
from app import db
|
||||
from app.api import api_bp as bp
|
||||
from app.api import api_web_bp
|
||||
from app.api import get_model_api_object
|
||||
from app.decorators import permission_required, scodoc
|
||||
from app.models import Identite, Justificatif
|
||||
from app.models.assiduites import compute_assiduites_justified
|
||||
from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
|
||||
from app.scodoc.sco_exceptions import ScoValueError
|
||||
from app.scodoc.sco_permissions import Permission
|
||||
from app.scodoc.sco_utils import json_error
|
||||
|
||||
|
||||
# Partie Modèle
|
||||
@bp.route("/justificatif/<int:justif_id>")
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>")
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def justificatif(justif_id: int = None):
|
||||
"""Retourne un objet justificatif à partir de son id
|
||||
|
||||
Exemple de résultat:
|
||||
{
|
||||
"justif_id": 1,
|
||||
"etudid": 2,
|
||||
"date_debut": "2022-10-31T08:00+01:00",
|
||||
"date_fin": "2022-10-31T10:00+01:00",
|
||||
"etat": "valide",
|
||||
"fichier": "archive_id",
|
||||
"raison": "une raison",
|
||||
"entry_date": "2022-10-31T08:00+01:00",
|
||||
"user_id": 1 or null,
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
return get_model_api_object(Justificatif, justif_id, Identite)
|
||||
|
||||
|
||||
@bp.route("/justificatifs/<int:etudid>", defaults={"with_query": False})
|
||||
@bp.route("/justificatifs/<int:etudid>/query", defaults={"with_query": True})
|
||||
@api_web_bp.route("/justificatifs/<int:etudid>", defaults={"with_query": False})
|
||||
@api_web_bp.route("/justificatifs/<int:etudid>/query", defaults={"with_query": True})
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
def justificatifs(etudid: int = None, with_query: bool = False):
|
||||
"""
|
||||
Retourne toutes les assiduités d'un étudiant
|
||||
chemin : /justificatifs/<int:etudid>
|
||||
|
||||
Un filtrage peut être donné avec une query
|
||||
chemin : /justificatifs/<int:etudid>/query?
|
||||
|
||||
Les différents filtres :
|
||||
Etat (etat du justificatif -> validé, non validé, modifé, en attente):
|
||||
query?etat=[- liste des états séparé par une virgule -]
|
||||
ex: .../query?etat=validé,modifié
|
||||
Date debut
|
||||
(date de début du justificatif, sont affichés les justificatifs
|
||||
dont la date de début est supérieur ou égale à la valeur donnée):
|
||||
query?date_debut=[- date au format iso -]
|
||||
ex: query?date_debut=2022-11-03T08:00+01:00
|
||||
Date fin
|
||||
(date de fin du justificatif, sont affichés les justificatifs
|
||||
dont la date de fin est inférieure ou égale à la valeur donnée):
|
||||
query?date_fin=[- date au format iso -]
|
||||
ex: query?date_fin=2022-11-03T10:00+01:00
|
||||
user_id (l'id de l'auteur du justificatif)
|
||||
query?user_id=[int]
|
||||
ex query?user_id=3
|
||||
"""
|
||||
|
||||
query = Identite.query.filter_by(id=etudid)
|
||||
if g.scodoc_dept:
|
||||
query = query.filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
etud: Identite = query.first_or_404(etudid)
|
||||
justificatifs_query = etud.justificatifs
|
||||
|
||||
if with_query:
|
||||
justificatifs_query = _filter_manager(request, justificatifs_query)
|
||||
|
||||
data_set: list[dict] = []
|
||||
for just in justificatifs_query.all():
|
||||
data = just.to_dict(format_api=True)
|
||||
data_set.append(data)
|
||||
|
||||
return jsonify(data_set)
|
||||
|
||||
|
||||
@bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
|
||||
@api_web_bp.route("/justificatif/<int:etudid>/create", methods=["POST"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_create(etudid: int = None):
|
||||
"""
|
||||
Création d'un justificatif pour l'étudiant (etudid)
|
||||
La requête doit avoir un content type "application/json":
|
||||
[
|
||||
{
|
||||
"date_debut": str,
|
||||
"date_fin": str,
|
||||
"etat": str,
|
||||
},
|
||||
{
|
||||
"date_debut": str,
|
||||
"date_fin": str,
|
||||
"etat": str,
|
||||
"raison":str,
|
||||
}
|
||||
...
|
||||
]
|
||||
|
||||
"""
|
||||
etud: Identite = Identite.query.filter_by(id=etudid).first_or_404()
|
||||
|
||||
create_list: list[object] = request.get_json(force=True)
|
||||
|
||||
if not isinstance(create_list, list):
|
||||
return json_error(404, "Le contenu envoyé n'est pas une liste")
|
||||
|
||||
errors: dict[int, str] = {}
|
||||
success: dict[int, object] = {}
|
||||
for i, data in enumerate(create_list):
|
||||
code, obj = _create_singular(data, etud)
|
||||
if code == 404:
|
||||
errors[i] = obj
|
||||
else:
|
||||
success[i] = obj
|
||||
compute_assiduites_justified(Justificatif.query.filter_by(etudid=etudid), True)
|
||||
return jsonify({"errors": errors, "success": success})
|
||||
|
||||
|
||||
def _create_singular(
|
||||
data: dict,
|
||||
etud: Identite,
|
||||
) -> tuple[int, object]:
|
||||
errors: list[str] = []
|
||||
|
||||
# -- vérifications de l'objet json --
|
||||
# cas 1 : ETAT
|
||||
etat = data.get("etat", None)
|
||||
if etat is None:
|
||||
errors.append("param 'etat': manquant")
|
||||
elif not scu.EtatJustificatif.contains(etat):
|
||||
errors.append("param 'etat': invalide")
|
||||
|
||||
etat = scu.EtatJustificatif.get(etat)
|
||||
|
||||
# cas 2 : date_debut
|
||||
date_debut = data.get("date_debut", None)
|
||||
if date_debut is None:
|
||||
errors.append("param 'date_debut': manquant")
|
||||
deb = scu.is_iso_formated(date_debut, convert=True)
|
||||
if deb is None:
|
||||
errors.append("param 'date_debut': format invalide")
|
||||
|
||||
# cas 3 : date_fin
|
||||
date_fin = data.get("date_fin", None)
|
||||
if date_fin is None:
|
||||
errors.append("param 'date_fin': manquant")
|
||||
fin = scu.is_iso_formated(date_fin, convert=True)
|
||||
if fin is None:
|
||||
errors.append("param 'date_fin': format invalide")
|
||||
|
||||
# cas 4 : raison
|
||||
|
||||
raison: str = data.get("raison", None)
|
||||
|
||||
if errors:
|
||||
err: str = ", ".join(errors)
|
||||
return (404, err)
|
||||
|
||||
# TOUT EST OK
|
||||
|
||||
try:
|
||||
nouv_justificatif: Justificatif = Justificatif.create_justificatif(
|
||||
date_debut=deb,
|
||||
date_fin=fin,
|
||||
etat=etat,
|
||||
etud=etud,
|
||||
raison=raison,
|
||||
user_id=current_user.id,
|
||||
)
|
||||
|
||||
db.session.add(nouv_justificatif)
|
||||
db.session.commit()
|
||||
|
||||
return (
|
||||
200,
|
||||
{
|
||||
"justif_id": nouv_justificatif.id,
|
||||
"couverture": scass.justifies(nouv_justificatif),
|
||||
},
|
||||
)
|
||||
except ScoValueError as excp:
|
||||
return (
|
||||
404,
|
||||
excp.args[0],
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"])
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>/edit", methods=["POST"])
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_edit(justif_id: int):
|
||||
"""
|
||||
Edition d'un justificatif à partir de son id
|
||||
La requête doit avoir un content type "application/json":
|
||||
|
||||
{
|
||||
"etat"?: str,
|
||||
"raison"?: str
|
||||
"date_debut"?: str
|
||||
"date_fin"?: str
|
||||
}
|
||||
"""
|
||||
justificatif_unique: Justificatif = Justificatif.query.filter_by(
|
||||
id=justif_id
|
||||
).first_or_404()
|
||||
|
||||
errors: list[str] = []
|
||||
data = request.get_json(force=True)
|
||||
avant_ids: list[int] = scass.justifies(justificatif_unique)
|
||||
# Vérifications de data
|
||||
|
||||
# Cas 1 : Etat
|
||||
if data.get("etat") is not None:
|
||||
etat = scu.EtatJustificatif.get(data.get("etat"))
|
||||
if etat is None:
|
||||
errors.append("param 'etat': invalide")
|
||||
else:
|
||||
justificatif_unique.etat = etat
|
||||
|
||||
# Cas 2 : raison
|
||||
raison = data.get("raison", False)
|
||||
if raison is not False:
|
||||
justificatif_unique.raison = raison
|
||||
|
||||
deb, fin = None, None
|
||||
|
||||
# cas 3 : date_debut
|
||||
date_debut = data.get("date_debut", False)
|
||||
if date_debut is not False:
|
||||
if date_debut is None:
|
||||
errors.append("param 'date_debut': manquant")
|
||||
deb = scu.is_iso_formated(date_debut.replace(" ", "+"), convert=True)
|
||||
if deb is None:
|
||||
errors.append("param 'date_debut': format invalide")
|
||||
|
||||
if justificatif_unique.date_fin >= deb:
|
||||
errors.append("param 'date_debut': date de début située après date de fin ")
|
||||
|
||||
# cas 4 : date_fin
|
||||
date_fin = data.get("date_fin", False)
|
||||
if date_fin is not False:
|
||||
if date_fin is None:
|
||||
errors.append("param 'date_fin': manquant")
|
||||
fin = scu.is_iso_formated(date_fin.replace(" ", "+"), convert=True)
|
||||
if fin is None:
|
||||
errors.append("param 'date_fin': format invalide")
|
||||
if justificatif_unique.date_debut <= fin:
|
||||
errors.append("param 'date_fin': date de fin située avant date de début ")
|
||||
|
||||
# Mise à jour des dates
|
||||
deb = deb if deb is not None else justificatif_unique.date_debut
|
||||
fin = fin if fin is not None else justificatif_unique.date_fin
|
||||
|
||||
justificatif_unique.date_debut = deb
|
||||
justificatif_unique.date_fin = fin
|
||||
|
||||
if errors:
|
||||
err: str = ", ".join(errors)
|
||||
return json_error(404, err)
|
||||
|
||||
db.session.add(justificatif_unique)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify(
|
||||
{
|
||||
"couverture": {
|
||||
"avant": avant_ids,
|
||||
"après": compute_assiduites_justified(
|
||||
Justificatif.query.filter_by(etudid=justificatif_unique.etudid),
|
||||
True,
|
||||
),
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@bp.route("/justificatif/delete", methods=["POST"])
|
||||
@api_web_bp.route("/justificatif/delete", methods=["POST"])
|
||||
@login_required
|
||||
@scodoc
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_delete():
|
||||
"""
|
||||
Suppression d'un justificatif à partir de son id
|
||||
|
||||
Forme des données envoyées :
|
||||
|
||||
[
|
||||
<justif_id:int>,
|
||||
...
|
||||
]
|
||||
|
||||
|
||||
"""
|
||||
justificatifs_list: list[int] = request.get_json(force=True)
|
||||
if not isinstance(justificatifs_list, list):
|
||||
return json_error(404, "Le contenu envoyé n'est pas une liste")
|
||||
|
||||
output = {"errors": {}, "success": {}}
|
||||
|
||||
for i, ass in enumerate(justificatifs_list):
|
||||
code, msg = _delete_singular(ass, db)
|
||||
if code == 404:
|
||||
output["errors"][f"{i}"] = msg
|
||||
else:
|
||||
output["success"][f"{i}"] = {"OK": True}
|
||||
db.session.commit()
|
||||
return jsonify(output)
|
||||
|
||||
|
||||
def _delete_singular(justif_id: int, database):
|
||||
justificatif_unique: Justificatif = Justificatif.query.filter_by(
|
||||
id=justif_id
|
||||
).first()
|
||||
if justificatif_unique is None:
|
||||
return (404, "Justificatif non existant")
|
||||
|
||||
archive_name: str = justificatif_unique.fichier
|
||||
|
||||
if archive_name is not None:
|
||||
archiver: JustificatifArchiver = JustificatifArchiver()
|
||||
archiver.delete_justificatif(justificatif_unique.etudid, archive_name)
|
||||
|
||||
database.session.delete(justificatif_unique)
|
||||
return (200, "OK")
|
||||
|
||||
|
||||
# Partie archivage
|
||||
@bp.route("/justificatif/<int:justif_id>/import", methods=["POST"])
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>/import", methods=["POST"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_import(justif_id: int = None):
|
||||
"""
|
||||
Importation d'un fichier (création d'archive)
|
||||
"""
|
||||
if len(request.files) == 0:
|
||||
return json_error(404, "Il n'y a pas de fichier joint")
|
||||
|
||||
file = list(request.files.values())[0]
|
||||
if file.filename == "":
|
||||
return json_error(404, "Il n'y a pas de fichier joint")
|
||||
|
||||
query = Justificatif.query.filter_by(id=justif_id)
|
||||
if g.scodoc_dept:
|
||||
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
justificatif_unique: Justificatif = query.first_or_404()
|
||||
|
||||
archive_name: str = justificatif_unique.fichier
|
||||
|
||||
archiver: JustificatifArchiver = JustificatifArchiver()
|
||||
try:
|
||||
fname: str
|
||||
archive_name, fname = archiver.save_justificatif(
|
||||
etudid=justificatif_unique.etudid,
|
||||
filename=file.filename,
|
||||
data=file.stream.read(),
|
||||
archive_name=archive_name,
|
||||
)
|
||||
|
||||
justificatif_unique.fichier = archive_name
|
||||
|
||||
db.session.add(justificatif_unique)
|
||||
db.session.commit()
|
||||
|
||||
return jsonify({"filename": fname})
|
||||
except ScoValueError as err:
|
||||
return json_error(404, err.args[0])
|
||||
|
||||
|
||||
@bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"])
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_export(justif_id: int = None, filename: str = None):
|
||||
"""
|
||||
Retourne un fichier d'une archive d'un justificatif
|
||||
"""
|
||||
|
||||
query = Justificatif.query.filter_by(id=justif_id)
|
||||
if g.scodoc_dept:
|
||||
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
justificatif_unique: Justificatif = query.first_or_404()
|
||||
|
||||
archive_name: str = justificatif_unique.fichier
|
||||
if archive_name is None:
|
||||
return json_error(404, "le justificatif ne possède pas de fichier")
|
||||
|
||||
archiver: JustificatifArchiver = JustificatifArchiver()
|
||||
|
||||
try:
|
||||
return archiver.get_justificatif_file(
|
||||
archive_name, justificatif_unique.etudid, filename
|
||||
)
|
||||
except ScoValueError as err:
|
||||
return json_error(404, err.args[0])
|
||||
|
||||
|
||||
@bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"])
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_remove(justif_id: int = None):
|
||||
"""
|
||||
Supression d'un fichier ou d'une archive
|
||||
# TOTALK: Doc, expliquer les noms coté server
|
||||
{
|
||||
"remove": <"all"/"list">
|
||||
|
||||
"filenames"?: [
|
||||
<filename:str>,
|
||||
...
|
||||
]
|
||||
}
|
||||
"""
|
||||
|
||||
data: dict = request.get_json(force=True)
|
||||
|
||||
query = Justificatif.query.filter_by(id=justif_id)
|
||||
if g.scodoc_dept:
|
||||
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
justificatif_unique: Justificatif = query.first_or_404()
|
||||
|
||||
archive_name: str = justificatif_unique.fichier
|
||||
if archive_name is None:
|
||||
return json_error(404, "le justificatif ne possède pas de fichier")
|
||||
|
||||
remove: str = data.get("remove")
|
||||
if remove is None or remove not in ("all", "list"):
|
||||
return json_error(404, "param 'remove': Valeur invalide")
|
||||
archiver: JustificatifArchiver = JustificatifArchiver()
|
||||
etudid: int = justificatif_unique.etudid
|
||||
try:
|
||||
if remove == "all":
|
||||
archiver.delete_justificatif(etudid=etudid, archive_name=archive_name)
|
||||
justificatif_unique.fichier = None
|
||||
db.session.add(justificatif_unique)
|
||||
db.session.commit()
|
||||
|
||||
else:
|
||||
for fname in data.get("filenames", []):
|
||||
archiver.delete_justificatif(
|
||||
etudid=etudid,
|
||||
archive_name=archive_name,
|
||||
filename=fname,
|
||||
)
|
||||
|
||||
if len(archiver.list_justificatifs(archive_name, etudid)) == 0:
|
||||
archiver.delete_justificatif(etudid, archive_name)
|
||||
justificatif_unique.fichier = None
|
||||
db.session.add(justificatif_unique)
|
||||
db.session.commit()
|
||||
|
||||
except ScoValueError as err:
|
||||
return json_error(404, err.args[0])
|
||||
|
||||
return jsonify({"response": "removed"})
|
||||
|
||||
|
||||
@bp.route("/justificatif/<int:justif_id>/list", methods=["GET"])
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>/list", methods=["GET"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_list(justif_id: int = None):
|
||||
"""
|
||||
Liste les fichiers du justificatif
|
||||
"""
|
||||
|
||||
query = Justificatif.query.filter_by(id=justif_id)
|
||||
if g.scodoc_dept:
|
||||
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
justificatif_unique: Justificatif = query.first_or_404()
|
||||
|
||||
archive_name: str = justificatif_unique.fichier
|
||||
|
||||
filenames: list[str] = []
|
||||
|
||||
archiver: JustificatifArchiver = JustificatifArchiver()
|
||||
if archive_name is not None:
|
||||
filenames = archiver.list_justificatifs(
|
||||
archive_name, justificatif_unique.etudid
|
||||
)
|
||||
|
||||
return jsonify(filenames)
|
||||
|
||||
|
||||
# Partie justification
|
||||
@bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"])
|
||||
@api_web_bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"])
|
||||
@scodoc
|
||||
@login_required
|
||||
@permission_required(Permission.ScoView)
|
||||
# @permission_required(Permission.ScoAssiduiteChange)
|
||||
def justif_justifies(justif_id: int = None):
|
||||
"""
|
||||
Liste assiduite_id justifiées par le justificatif
|
||||
"""
|
||||
|
||||
query = Justificatif.query.filter_by(id=justif_id)
|
||||
if g.scodoc_dept:
|
||||
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
|
||||
|
||||
justificatif_unique: Justificatif = query.first_or_404()
|
||||
|
||||
assiduites_list: list[int] = scass.justifies(justificatif_unique)
|
||||
|
||||
return jsonify(assiduites_list)
|
||||
|
||||
|
||||
# -- Utils --
|
||||
|
||||
|
||||
def _filter_manager(requested, justificatifs_query):
|
||||
"""
|
||||
Retourne les justificatifs entrés filtrés en fonction de la request
|
||||
"""
|
||||
# cas 1 : etat justificatif
|
||||
etat = requested.args.get("etat")
|
||||
if etat is not None:
|
||||
justificatifs_query = scass.filter_justificatifs_by_etat(
|
||||
justificatifs_query, etat
|
||||
)
|
||||
|
||||
# cas 2 : date de début
|
||||
deb = requested.args.get("date_debut", "").replace(" ", "+")
|
||||
deb: datetime = scu.is_iso_formated(deb, True)
|
||||
|
||||
# cas 3 : date de fin
|
||||
fin = requested.args.get("date_fin", "").replace(" ", "+")
|
||||
fin = scu.is_iso_formated(fin, True)
|
||||
|
||||
if (deb, fin) != (None, None):
|
||||
justificatifs_query: Justificatif = scass.filter_by_date(
|
||||
justificatifs_query, Justificatif, deb, fin
|
||||
)
|
||||
|
||||
user_id = requested.args.get("user_id", False)
|
||||
if user_id is not False:
|
||||
justificatif_query: Justificatif = scass.filter_by_user_id(
|
||||
justificatif_query, user_id
|
||||
)
|
||||
|
||||
return justificatifs_query
|
@ -68,7 +68,7 @@ from app import log, ScoDocJSONEncoder
|
||||
from app.but import jury_but_pv
|
||||
from app.comp import res_sem
|
||||
from app.comp.res_compat import NotesTableCompat
|
||||
from app.models import Departement, FormSemestre
|
||||
from app.models import FormSemestre
|
||||
from app.scodoc.TrivialFormulator import TrivialFormulator
|
||||
from app.scodoc.sco_exceptions import ScoException, ScoPermissionDenied
|
||||
from app.scodoc import html_sco_header
|
||||
@ -86,6 +86,11 @@ class BaseArchiver(object):
|
||||
self.archive_type = archive_type
|
||||
self.initialized = False
|
||||
self.root = None
|
||||
self.dept_id = None
|
||||
|
||||
def set_dept_id(self, dept_id: int):
|
||||
"set dept"
|
||||
self.dept_id = dept_id
|
||||
|
||||
def initialize(self):
|
||||
if self.initialized:
|
||||
@ -107,6 +112,8 @@ class BaseArchiver(object):
|
||||
finally:
|
||||
scu.GSL.release()
|
||||
self.initialized = True
|
||||
if self.dept_id is None:
|
||||
self.dept_id = getattr(g, "scodoc_dept_id")
|
||||
|
||||
def get_obj_dir(self, oid: int):
|
||||
"""
|
||||
@ -114,8 +121,7 @@ class BaseArchiver(object):
|
||||
If directory does not yet exist, create it.
|
||||
"""
|
||||
self.initialize()
|
||||
dept = Departement.query.filter_by(acronym=g.scodoc_dept).first()
|
||||
dept_dir = os.path.join(self.root, str(dept.id))
|
||||
dept_dir = os.path.join(self.root, str(self.dept_id))
|
||||
try:
|
||||
scu.GSL.acquire()
|
||||
if not os.path.isdir(dept_dir):
|
||||
@ -140,8 +146,7 @@ class BaseArchiver(object):
|
||||
:return: list of archive oids
|
||||
"""
|
||||
self.initialize()
|
||||
dept = Departement.query.filter_by(acronym=g.scodoc_dept).first()
|
||||
base = os.path.join(self.root, str(dept.id)) + os.path.sep
|
||||
base = os.path.join(self.root, str(self.dept_id)) + os.path.sep
|
||||
dirs = glob.glob(base + "*")
|
||||
return [os.path.split(x)[1] for x in dirs]
|
||||
|
||||
|
215
app/scodoc/sco_archives_justificatifs.py
Normal file
215
app/scodoc/sco_archives_justificatifs.py
Normal file
@ -0,0 +1,215 @@
|
||||
"""
|
||||
Gestion de l'archivage des justificatifs
|
||||
|
||||
Ecrit par Matthias HARTMANN
|
||||
"""
|
||||
import os
|
||||
from datetime import datetime
|
||||
from shutil import rmtree
|
||||
|
||||
from app.models import Identite
|
||||
from app.scodoc.sco_archives import BaseArchiver
|
||||
from app.scodoc.sco_exceptions import ScoValueError
|
||||
from app.scodoc.sco_utils import is_iso_formated
|
||||
|
||||
|
||||
class Trace:
|
||||
"""gestionnaire de la trace des fichiers justificatifs"""
|
||||
|
||||
def __init__(self, path: str) -> None:
|
||||
self.path: str = path + "/_trace.csv"
|
||||
self.content: dict[str, list[datetime, datetime]] = {}
|
||||
self.import_from_file()
|
||||
|
||||
def import_from_file(self):
|
||||
"""import trace from file"""
|
||||
if os.path.isfile(self.path):
|
||||
with open(self.path, "r", encoding="utf-8") as file:
|
||||
for line in file.readlines():
|
||||
csv = line.split(",")
|
||||
fname: str = csv[0]
|
||||
entry_date: datetime = is_iso_formated(csv[1], True)
|
||||
delete_date: datetime = is_iso_formated(csv[2], True)
|
||||
|
||||
self.content[fname] = [entry_date, delete_date]
|
||||
|
||||
def set_trace(self, *fnames: str, mode: str = "entry"):
|
||||
"""Ajoute une trace du fichier donné
|
||||
mode : entry / delete
|
||||
"""
|
||||
modes: list[str] = ["entry", "delete"]
|
||||
for fname in fnames:
|
||||
if fname in modes:
|
||||
continue
|
||||
traced: list[datetime, datetime] = self.content.get(fname, False)
|
||||
if not traced:
|
||||
self.content[fname] = [None, None]
|
||||
traced = self.content[fname]
|
||||
|
||||
traced[modes.index(mode)] = datetime.now()
|
||||
self.save_trace()
|
||||
|
||||
def save_trace(self):
|
||||
"""Enregistre la trace dans le fichier _trace.csv"""
|
||||
lines: list[str] = []
|
||||
for fname, traced in self.content.items():
|
||||
date_fin: datetime or None = traced[1].isoformat() if traced[1] else "None"
|
||||
|
||||
lines.append(f"{fname},{traced[0].isoformat()},{date_fin}")
|
||||
with open(self.path, "w", encoding="utf-8") as file:
|
||||
file.write("\n".join(lines))
|
||||
|
||||
def get_trace(self, fnames: list[str] = ()) -> dict[str, list[datetime, datetime]]:
|
||||
"""Récupère la trace pour les noms de fichiers.
|
||||
si aucun nom n'est donné, récupère tous les fichiers"""
|
||||
|
||||
if fnames is None or len(fnames) == 0:
|
||||
return self.content
|
||||
|
||||
traced: dict = {}
|
||||
for fname in fnames:
|
||||
traced[fname] = self.content.get(fname, None)
|
||||
|
||||
return traced
|
||||
|
||||
|
||||
class JustificatifArchiver(BaseArchiver):
|
||||
"""
|
||||
|
||||
TOTALK:
|
||||
- oid -> etudid
|
||||
- archive_id -> date de création de l'archive (une archive par dépot de document)
|
||||
|
||||
justificatif
|
||||
└── <dept_id>
|
||||
└── <etudid/oid>
|
||||
├── [_trace.csv]
|
||||
└── <archive_id>
|
||||
├── [_description.txt]
|
||||
└── [<filename.ext>]
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
BaseArchiver.__init__(self, archive_type="justificatifs")
|
||||
|
||||
def save_justificatif(
|
||||
self,
|
||||
etudid: int,
|
||||
filename: str,
|
||||
data: bytes or str,
|
||||
archive_name: str = None,
|
||||
description: str = "",
|
||||
) -> str:
|
||||
"""
|
||||
Ajoute un fichier dans une archive "justificatif" pour l'etudid donné
|
||||
Retourne l'archive_name utilisé
|
||||
"""
|
||||
self._set_dept(etudid)
|
||||
if archive_name is None:
|
||||
archive_id: str = self.create_obj_archive(
|
||||
oid=etudid, description=description
|
||||
)
|
||||
else:
|
||||
archive_id: str = self.get_id_from_name(etudid, archive_name)
|
||||
|
||||
fname: str = self.store(archive_id, filename, data)
|
||||
|
||||
trace = Trace(self.get_obj_dir(etudid))
|
||||
trace.set_trace(fname, "entry")
|
||||
|
||||
return self.get_archive_name(archive_id), fname
|
||||
|
||||
def delete_justificatif(
|
||||
self,
|
||||
etudid: int,
|
||||
archive_name: str,
|
||||
filename: str = None,
|
||||
has_trace: bool = True,
|
||||
):
|
||||
"""
|
||||
Supprime une archive ou un fichier particulier de l'archivage de l'étudiant donné
|
||||
|
||||
Si trace == True : sauvegarde le nom du/des fichier(s) supprimé(s) dans la trace de l'étudiant
|
||||
"""
|
||||
self._set_dept(etudid)
|
||||
if str(etudid) not in self.list_oids():
|
||||
raise ValueError(f"Aucune archive pour etudid[{etudid}]")
|
||||
|
||||
archive_id = self.get_id_from_name(etudid, archive_name)
|
||||
|
||||
if filename is not None:
|
||||
if filename not in self.list_archive(archive_id):
|
||||
raise ValueError(
|
||||
f"filename {filename} inconnu dans l'archive archive_id[{archive_id}] -> etudid[{etudid}]"
|
||||
)
|
||||
|
||||
path: str = os.path.join(self.get_obj_dir(etudid), archive_id, filename)
|
||||
|
||||
if os.path.isfile(path):
|
||||
if has_trace:
|
||||
trace = Trace(self.get_obj_dir(etudid))
|
||||
trace.set_trace(filename, "delete")
|
||||
os.remove(path)
|
||||
|
||||
else:
|
||||
if has_trace:
|
||||
trace = Trace(self.get_obj_dir(etudid))
|
||||
trace.set_trace(*self.list_archive(archive_id), mode="delete")
|
||||
|
||||
self.delete_archive(
|
||||
os.path.join(
|
||||
self.get_obj_dir(etudid),
|
||||
archive_id,
|
||||
)
|
||||
)
|
||||
|
||||
def list_justificatifs(self, archive_name: str, etudid: int) -> list[str]:
|
||||
"""
|
||||
Retourne la liste des noms de fichiers dans l'archive donnée
|
||||
"""
|
||||
self._set_dept(etudid)
|
||||
filenames: list[str] = []
|
||||
archive_id = self.get_id_from_name(etudid, archive_name)
|
||||
|
||||
filenames = self.list_archive(archive_id)
|
||||
return filenames
|
||||
|
||||
def get_justificatif_file(self, archive_name: str, etudid: int, filename: str):
|
||||
"""
|
||||
Retourne une réponse de téléchargement de fichier si le fichier existe
|
||||
"""
|
||||
self._set_dept(etudid)
|
||||
archive_id: str = self.get_id_from_name(etudid, archive_name)
|
||||
if filename in self.list_archive(archive_id):
|
||||
return self.get_archived_file(etudid, archive_name, filename)
|
||||
raise ScoValueError(
|
||||
f"Fichier {filename} introuvable dans l'archive {archive_name}"
|
||||
)
|
||||
|
||||
def _set_dept(self, etudid: int):
|
||||
"""
|
||||
Mets à jour le dept_id de l'archiver en fonction du département de l'étudiant
|
||||
"""
|
||||
etud: Identite = Identite.query.filter_by(id=etudid).first()
|
||||
self.set_dept_id(etud.dept_id)
|
||||
|
||||
def remove_dept_archive(self, dept_id: int = None):
|
||||
"""
|
||||
Supprime toutes les archives d'un département (ou de tous les départements)
|
||||
⚠ Supprime aussi les fichiers de trace ⚠
|
||||
"""
|
||||
self.set_dept_id(1)
|
||||
self.initialize()
|
||||
|
||||
if dept_id is None:
|
||||
rmtree(self.root, ignore_errors=True)
|
||||
else:
|
||||
rmtree(os.path.join(self.root, str(dept_id)), ignore_errors=True)
|
||||
|
||||
def get_trace(
|
||||
self, etudid: int, *fnames: str
|
||||
) -> dict[str, list[datetime, datetime]]:
|
||||
"""Récupère la trace des justificatifs de l'étudiant"""
|
||||
trace = Trace(self.get_obj_dir(etudid))
|
||||
return trace.get_trace(fnames)
|
355
app/scodoc/sco_assiduites.py
Normal file
355
app/scodoc/sco_assiduites.py
Normal file
@ -0,0 +1,355 @@
|
||||
"""
|
||||
Ecrit par Matthias Hartmann.
|
||||
"""
|
||||
from datetime import date, datetime, time, timedelta
|
||||
|
||||
import app.scodoc.sco_utils as scu
|
||||
from app.models.assiduites import Assiduite, Justificatif
|
||||
from app.models.etudiants import Identite
|
||||
from app.models.formsemestre import FormSemestre, FormSemestreInscription
|
||||
|
||||
|
||||
class CountCalculator:
|
||||
"""Classe qui gére le comptage des assiduités"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
morning: time = time(8, 0),
|
||||
noon: time = time(12, 0),
|
||||
after_noon: time = time(14, 00),
|
||||
evening: time = time(18, 0),
|
||||
skip_saturday: bool = True,
|
||||
) -> None:
|
||||
|
||||
self.morning: time = morning
|
||||
self.noon: time = noon
|
||||
self.after_noon: time = after_noon
|
||||
self.evening: time = evening
|
||||
self.skip_saturday: bool = skip_saturday
|
||||
|
||||
delta_total: timedelta = datetime.combine(date.min, evening) - datetime.combine(
|
||||
date.min, morning
|
||||
)
|
||||
delta_lunch: timedelta = datetime.combine(
|
||||
date.min, after_noon
|
||||
) - datetime.combine(date.min, noon)
|
||||
|
||||
self.hour_per_day: float = (delta_total - delta_lunch).total_seconds() / 3600
|
||||
|
||||
self.days: list[date] = []
|
||||
self.half_days: list[tuple[date, bool]] = [] # tuple -> (date, morning:bool)
|
||||
self.hours: float = 0.0
|
||||
|
||||
self.count: int = 0
|
||||
|
||||
def reset(self):
|
||||
"""Remet à zero le compteur"""
|
||||
self.days = []
|
||||
self.half_days = []
|
||||
self.hours = 0.0
|
||||
self.count = 0
|
||||
|
||||
def add_half_day(self, day: date, is_morning: bool = True):
|
||||
"""Ajoute une demi journée dans le comptage"""
|
||||
key: tuple[date, bool] = (day, is_morning)
|
||||
if key not in self.half_days:
|
||||
self.half_days.append(key)
|
||||
|
||||
def add_day(self, day: date):
|
||||
"""Ajoute un jour dans le comptage"""
|
||||
if day not in self.days:
|
||||
self.days.append(day)
|
||||
|
||||
def check_in_morning(self, period: tuple[datetime, datetime]) -> bool:
|
||||
"""Vérifiée si la période donnée fait partie du matin
|
||||
(Test sur la date de début)
|
||||
"""
|
||||
|
||||
interval_morning: tuple[datetime, datetime] = (
|
||||
scu.localize_datetime(datetime.combine(period[0].date(), self.morning)),
|
||||
scu.localize_datetime(datetime.combine(period[0].date(), self.noon)),
|
||||
)
|
||||
|
||||
in_morning: bool = scu.is_period_overlapping(
|
||||
period, interval_morning, bornes=False
|
||||
)
|
||||
return in_morning
|
||||
|
||||
def check_in_evening(self, period: tuple[datetime, datetime]) -> bool:
|
||||
"""Vérifie si la période fait partie de l'aprèm
|
||||
(test sur la date de début)
|
||||
"""
|
||||
|
||||
interval_evening: tuple[datetime, datetime] = (
|
||||
scu.localize_datetime(datetime.combine(period[0].date(), self.after_noon)),
|
||||
scu.localize_datetime(datetime.combine(period[0].date(), self.evening)),
|
||||
)
|
||||
|
||||
in_evening: bool = scu.is_period_overlapping(period, interval_evening)
|
||||
|
||||
return in_evening
|
||||
|
||||
def compute_long_assiduite(self, assi: Assiduite):
|
||||
"""Calcule les métriques sur une assiduité longue (plus d'un jour)"""
|
||||
|
||||
pointer_date: date = assi.date_debut.date() + timedelta(days=1)
|
||||
start_hours: timedelta = assi.date_debut - scu.localize_datetime(
|
||||
datetime.combine(assi.date_debut, self.morning)
|
||||
)
|
||||
finish_hours: timedelta = assi.date_fin - scu.localize_datetime(
|
||||
datetime.combine(assi.date_fin, self.morning)
|
||||
)
|
||||
|
||||
self.add_day(assi.date_debut.date())
|
||||
self.add_day(assi.date_fin.date())
|
||||
|
||||
start_period: tuple[datetime, datetime] = (
|
||||
assi.date_debut,
|
||||
scu.localize_datetime(
|
||||
datetime.combine(assi.date_debut.date(), self.evening)
|
||||
),
|
||||
)
|
||||
|
||||
finish_period: tuple[datetime, datetime] = (
|
||||
scu.localize_datetime(datetime.combine(assi.date_fin.date(), self.morning)),
|
||||
assi.date_fin,
|
||||
)
|
||||
hours = 0.0
|
||||
for period in (start_period, finish_period):
|
||||
if self.check_in_evening(period):
|
||||
self.add_half_day(period[0].date(), False)
|
||||
if self.check_in_morning(period):
|
||||
self.add_half_day(period[0].date())
|
||||
|
||||
while pointer_date < assi.date_fin.date():
|
||||
if pointer_date.weekday() < (6 - self.skip_saturday):
|
||||
self.add_day(pointer_date)
|
||||
self.add_half_day(pointer_date)
|
||||
self.add_half_day(pointer_date, False)
|
||||
self.hours += self.hour_per_day
|
||||
hours += self.hour_per_day
|
||||
|
||||
pointer_date += timedelta(days=1)
|
||||
|
||||
self.hours += finish_hours.total_seconds() / 3600
|
||||
self.hours += self.hour_per_day - (start_hours.total_seconds() / 3600)
|
||||
|
||||
def compute_assiduites(self, assiduites: Assiduite):
|
||||
"""Calcule les métriques pour la collection d'assiduité donnée"""
|
||||
assi: Assiduite
|
||||
assiduites: list[Assiduite] = (
|
||||
assiduites.all() if isinstance(assiduites, Assiduite) else assiduites
|
||||
)
|
||||
for assi in assiduites:
|
||||
self.count += 1
|
||||
delta: timedelta = assi.date_fin - assi.date_debut
|
||||
|
||||
if delta.days > 0:
|
||||
# raise Exception(self.hours)
|
||||
self.compute_long_assiduite(assi)
|
||||
|
||||
continue
|
||||
|
||||
period: tuple[datetime, datetime] = (assi.date_debut, assi.date_fin)
|
||||
deb_date: date = assi.date_debut.date()
|
||||
if self.check_in_morning(period):
|
||||
self.add_half_day(deb_date)
|
||||
if self.check_in_evening(period):
|
||||
self.add_half_day(deb_date, False)
|
||||
|
||||
self.add_day(deb_date)
|
||||
|
||||
self.hours += delta.total_seconds() / 3600
|
||||
|
||||
def to_dict(self) -> dict[str, object]:
|
||||
"""Retourne les métriques sous la forme d'un dictionnaire"""
|
||||
return {
|
||||
"compte": self.count,
|
||||
"journee": len(self.days),
|
||||
"demi": len(self.half_days),
|
||||
"heure": round(self.hours, 2),
|
||||
}
|
||||
|
||||
|
||||
def get_assiduites_stats(
|
||||
assiduites: Assiduite, metric: str = "all", filtered: dict[str, object] = None
|
||||
) -> Assiduite:
|
||||
"""Compte les assiduités en fonction des filtres"""
|
||||
|
||||
if filtered is not None:
|
||||
deb, fin = None, None
|
||||
for key in filtered:
|
||||
if key == "etat":
|
||||
assiduites = filter_assiduites_by_etat(assiduites, filtered[key])
|
||||
elif key == "date_fin":
|
||||
fin = filtered[key]
|
||||
elif key == "date_debut":
|
||||
deb = filtered[key]
|
||||
elif key == "moduleimpl_id":
|
||||
assiduites = filter_by_module_impl(assiduites, filtered[key])
|
||||
elif key == "formsemestre":
|
||||
assiduites = filter_by_formsemestre(assiduites, filtered[key])
|
||||
elif key == "est_just":
|
||||
assiduites = filter_assiduites_by_est_just(assiduites, filtered[key])
|
||||
elif key == "user_id":
|
||||
assiduites = filter_by_user_id(assiduites, filtered[key])
|
||||
if (deb, fin) != (None, None):
|
||||
assiduites = filter_by_date(assiduites, Assiduite, deb, fin)
|
||||
|
||||
calculator: CountCalculator = CountCalculator()
|
||||
calculator.compute_assiduites(assiduites)
|
||||
count: dict = calculator.to_dict()
|
||||
|
||||
metrics: list[str] = metric.split(",")
|
||||
|
||||
output: dict = {}
|
||||
|
||||
for key, val in count.items():
|
||||
if key in metrics:
|
||||
output[key] = val
|
||||
return output if output else count
|
||||
|
||||
|
||||
def filter_assiduites_by_etat(assiduites: Assiduite, etat: str) -> Assiduite:
|
||||
"""
|
||||
Filtrage d'une collection d'assiduites en fonction de leur état
|
||||
"""
|
||||
etats: list[str] = list(etat.split(","))
|
||||
etats = [scu.EtatAssiduite.get(e, -1) for e in etats]
|
||||
return assiduites.filter(Assiduite.etat.in_(etats))
|
||||
|
||||
|
||||
def filter_assiduites_by_est_just(
|
||||
assiduites: Assiduite, est_just: bool
|
||||
) -> Justificatif:
|
||||
"""
|
||||
Filtrage d'une collection d'assiduites en fonction de s'ils sont justifiés
|
||||
"""
|
||||
return assiduites.filter_by(est_just=est_just)
|
||||
|
||||
|
||||
def filter_by_user_id(
|
||||
collection: Assiduite or Justificatif,
|
||||
user_id: int,
|
||||
) -> Justificatif:
|
||||
"""
|
||||
Filtrage d'une collection en fonction de l'user_id
|
||||
"""
|
||||
return collection.filter_by(user_id=user_id)
|
||||
|
||||
|
||||
def filter_by_date(
|
||||
collection: Assiduite or Justificatif,
|
||||
collection_cls: Assiduite or Justificatif,
|
||||
date_deb: datetime = None,
|
||||
date_fin: datetime = None,
|
||||
strict: bool = False,
|
||||
):
|
||||
"""
|
||||
Filtrage d'une collection d'assiduites en fonction d'une date
|
||||
"""
|
||||
if date_deb is None:
|
||||
date_deb = datetime.min
|
||||
if date_fin is None:
|
||||
date_fin = datetime.max
|
||||
|
||||
date_deb = scu.localize_datetime(date_deb)
|
||||
date_fin = scu.localize_datetime(date_fin)
|
||||
if not strict:
|
||||
return collection.filter(
|
||||
collection_cls.date_debut <= date_fin, collection_cls.date_fin >= date_deb
|
||||
)
|
||||
return collection.filter(
|
||||
collection_cls.date_debut < date_fin, collection_cls.date_fin > date_deb
|
||||
)
|
||||
|
||||
|
||||
def filter_justificatifs_by_etat(
|
||||
justificatifs: Justificatif, etat: str
|
||||
) -> Justificatif:
|
||||
"""
|
||||
Filtrage d'une collection de justificatifs en fonction de leur état
|
||||
"""
|
||||
etats: list[str] = list(etat.split(","))
|
||||
etats = [scu.EtatJustificatif.get(e, -1) for e in etats]
|
||||
return justificatifs.filter(Justificatif.etat.in_(etats))
|
||||
|
||||
|
||||
def filter_by_module_impl(
|
||||
assiduites: Assiduite, module_impl_id: int or None
|
||||
) -> Assiduite:
|
||||
"""
|
||||
Filtrage d'une collection d'assiduites en fonction de l'ID du module_impl
|
||||
"""
|
||||
return assiduites.filter(Assiduite.moduleimpl_id == module_impl_id)
|
||||
|
||||
|
||||
def filter_by_formsemestre(assiduites_query: Assiduite, formsemestre: FormSemestre):
|
||||
"""
|
||||
Filtrage d'une collection d'assiduites en fonction d'un formsemestre
|
||||
"""
|
||||
|
||||
if formsemestre is None:
|
||||
return assiduites_query.filter(False)
|
||||
|
||||
assiduites_query = (
|
||||
assiduites_query.join(Identite, Assiduite.etudid == Identite.id)
|
||||
.join(
|
||||
FormSemestreInscription,
|
||||
Identite.id == FormSemestreInscription.etudid,
|
||||
)
|
||||
.filter(FormSemestreInscription.formsemestre_id == formsemestre.id)
|
||||
)
|
||||
|
||||
assiduites_query = assiduites_query.filter(
|
||||
Assiduite.date_debut >= formsemestre.date_debut
|
||||
)
|
||||
return assiduites_query.filter(Assiduite.date_fin <= formsemestre.date_fin)
|
||||
|
||||
|
||||
def justifies(justi: Justificatif, obj: bool = False) -> list[int]:
|
||||
"""
|
||||
Retourne la liste des assiduite_id qui sont justifié par la justification
|
||||
Une assiduité est justifiée si elle est COMPLETEMENT ou PARTIELLEMENT comprise dans la plage du justificatif
|
||||
et que l'état du justificatif est "valide"
|
||||
renvoie des id si obj == False, sinon les Assiduités
|
||||
"""
|
||||
|
||||
if justi.etat != scu.EtatJustificatif.VALIDE:
|
||||
return []
|
||||
|
||||
assiduites_query: Assiduite = (
|
||||
Assiduite.query.join(Justificatif, Assiduite.etudid == Justificatif.etudid)
|
||||
.filter(Assiduite.etat != scu.EtatAssiduite.PRESENT)
|
||||
.filter(
|
||||
Assiduite.date_debut <= justi.date_fin,
|
||||
Assiduite.date_fin >= justi.date_debut,
|
||||
)
|
||||
)
|
||||
|
||||
if not obj:
|
||||
return [assi.id for assi in assiduites_query.all()]
|
||||
|
||||
return assiduites_query
|
||||
|
||||
|
||||
def get_all_justified(
|
||||
etudid: int, date_deb: datetime = None, date_fin: datetime = None
|
||||
) -> list[Assiduite]:
|
||||
"""Retourne toutes les assiduités justifiées sur une période"""
|
||||
|
||||
if date_deb is None:
|
||||
date_deb = datetime.min
|
||||
if date_fin is None:
|
||||
date_fin = datetime.max
|
||||
|
||||
date_deb = scu.localize_datetime(date_deb)
|
||||
date_fin = scu.localize_datetime(date_fin)
|
||||
justified = Assiduite.query.filter_by(est_just=True, etudid=etudid)
|
||||
after = filter_by_date(
|
||||
justified,
|
||||
Assiduite,
|
||||
date_deb,
|
||||
date_fin,
|
||||
)
|
||||
return after
|
Loading…
x
Reference in New Issue
Block a user