module assiduité: corrections linter

This commit is contained in:
iziram 2023-02-14 10:57:02 +01:00
parent 84ac334b13
commit 5aeffcbf1d
12 changed files with 339 additions and 139 deletions

View File

@ -182,6 +182,7 @@ def assiduites(etudid: int = None, with_query: bool = False):
@scodoc
@permission_required(Permission.ScoView)
def assiduites_formsemestre(formsemestre_id: int, with_query: bool = False):
"""Retourne toutes les assiduités du formsemestre"""
formsemestre: FormSemestre = None
formsemestre_id = int(formsemestre_id)
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()
@ -224,6 +225,7 @@ def assiduites_formsemestre(formsemestre_id: int, with_query: bool = False):
def count_assiduites_formsemestre(
formsemestre_id: int = None, with_query: bool = False
):
"""Comptage des assiduités du formsemestre"""
formsemestre: FormSemestre = None
formsemestre_id = int(formsemestre_id)
formsemestre = FormSemestre.query.filter_by(id=formsemestre_id).first()

View File

@ -288,7 +288,7 @@ def justif_edit(justif_id: int):
etuid=justificatif_unique.etudid
).all()
if is_period_conflicting(deb, fin, justificatifs_list):
if is_period_conflicting(deb, fin, justificatifs_list, Justificatif):
errors.append(
"Modification de la plage horaire impossible: conflit avec les autres justificatifs"
)
@ -357,8 +357,8 @@ def _delete_singular(justif_id: int, database):
# Partie archivage
@bp.route("/justificatif/import/<int:justif_id>", methods=["POST"])
@api_web_bp.route("/justificatif/import/<int:justif_id>", methods=["POST"])
@bp.route("/justificatif/<int:justif_id>/import", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/import", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
@ -402,8 +402,8 @@ def justif_import(justif_id: int = None):
return json_error(404, err.args[0])
@bp.route("/justificatif/export/<int:justif_id>/<filename>", methods=["POST"])
@api_web_bp.route("/justificatif/export/<int:justif_id>/<filename>", methods=["POST"])
@bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/export/<filename>", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
@ -433,8 +433,8 @@ def justif_export(justif_id: int = None, filename: str = None):
return json_error(404, err.args[0])
@bp.route("/justificatif/remove/<int:justif_id>", methods=["POST"])
@api_web_bp.route("/justificatif/remove/<int:justif_id>", methods=["POST"])
@bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"])
@api_web_bp.route("/justificatif/<int:justif_id>/remove", methods=["POST"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
@ -497,8 +497,8 @@ def justif_remove(justif_id: int = None):
return jsonify({"response": "removed"})
@bp.route("/justificatif/list/<int:justif_id>", methods=["GET"])
@api_web_bp.route("/justificatif/list/<int:justif_id>", methods=["GET"])
@bp.route("/justificatif/<int:justif_id>/list", methods=["GET"])
@api_web_bp.route("/justificatif/<int:justif_id>/list", methods=["GET"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
@ -528,13 +528,13 @@ def justif_list(justif_id: int = None):
# Partie justification
@bp.route("/justificatif/justified/<int:justif_id>", methods=["GET"])
@api_web_bp.route("/justificatif/justified/<int:justif_id>", methods=["GET"])
@bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"])
@api_web_bp.route("/justificatif/<int:justif_id>/justifies", methods=["GET"])
@scodoc
@login_required
@permission_required(Permission.ScoView)
# @permission_required(Permission.ScoAssiduiteChange)
def justif_justified(justif_id: int = None):
def justif_justifies(justif_id: int = None):
"""
Liste assiduite_id justifiées par le justificatif
"""

View File

@ -10,7 +10,6 @@ from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_utils import (
EtatAssiduite,
EtatJustificatif,
is_period_overlapping,
localize_datetime,
)
@ -52,6 +51,7 @@ class Assiduite(db.Model):
entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now())
def to_dict(self, format_api=True) -> dict:
"""Retourne la représentation json de l'assiduité"""
etat = self.etat
if format_api:

View File

@ -1,9 +1,14 @@
"""
Gestion de l'archivage des justificatifs
Ecrit par Matthias HARTMANN
"""
import os
from shutil import rmtree
from app.models import Identite
from app.scodoc.sco_archives import BaseArchiver
from app.scodoc.sco_exceptions import ScoValueError
from app.models import Identite, Departement
from shutil import rmtree
import os
class JustificatifArchiver(BaseArchiver):
@ -109,6 +114,9 @@ class JustificatifArchiver(BaseArchiver):
self.set_dept_id(etud.dept_id)
def remove_dept_archive(self, dept_id: int = None):
"""
Supprime toutes les archives d'un département (ou de tous les départements)
"""
self.set_dept_id(1)
self.initialize()

View File

@ -1,13 +1,21 @@
"""
Ensembles des fonctions utilisant les Assiduités et/ou Justificatifs
Ecrit par Matthias Hartmann.
"""
from datetime import date, datetime, time, timedelta
import app.scodoc.sco_utils as scu
from app.models.assiduites import Assiduite, Justificatif
from app.models.etudiants import Identite
from app.models.formsemestre import FormSemestre, FormSemestreInscription
from app.profiler import Profiler
class CountCalculator:
"""Classe qui gére le comptage des assiduités"""
def __init__(
self,
morning: time = time(8, 0),
@ -39,21 +47,27 @@ class CountCalculator:
self.count: int = 0
def reset(self):
"""Remet à zero le compteur"""
self.days = []
self.half_days = []
self.hours = 0.0
self.count = 0
def add_half_day(self, day: date, is_morning: bool = True):
"""Ajoute une demi journée dans le comptage"""
key: tuple[date, bool] = (day, is_morning)
if key not in self.half_days:
self.half_days.append(key)
def add_day(self, day: date):
"""Ajoute un jour dans le comptage"""
if day not in self.days:
self.days.append(day)
def check_in_morning(self, period: tuple[datetime, datetime]) -> bool:
"""Vérifiée si la période donnée fait partie du matin
(Test sur la date de début)
"""
interval_morning: tuple[datetime, datetime] = (
scu.localize_datetime(datetime.combine(period[0].date(), self.morning)),
@ -66,6 +80,9 @@ class CountCalculator:
return in_morning
def check_in_evening(self, period: tuple[datetime, datetime]) -> bool:
"""Vérifie si la période fait partie de l'aprèm
(test sur la date de début)
"""
interval_evening: tuple[datetime, datetime] = (
scu.localize_datetime(datetime.combine(period[0].date(), self.after_noon)),
@ -77,6 +94,7 @@ class CountCalculator:
return in_evening
def compute_long_assiduite(self, assi: Assiduite):
"""Calcule les métriques sur une assiduité longue (plus d'un jour)"""
pointer_date: date = assi.date_debut.date() + timedelta(days=1)
start_hours: timedelta = assi.date_debut - scu.localize_datetime(
@ -121,6 +139,7 @@ class CountCalculator:
self.hours += self.hour_per_day - (start_hours.total_seconds() / 3600)
def compute_assiduites(self, assiduites: Assiduite):
"""Calcule les métriques pour la collection d'assiduité donnée"""
assi: Assiduite
assiduites: list[Assiduite] = (
assiduites.all() if isinstance(assiduites, Assiduite) else assiduites
@ -147,6 +166,7 @@ class CountCalculator:
self.hours += delta.total_seconds() / 3600
def to_dict(self) -> dict[str, object]:
"""Retourne les métriques sous la forme d'un dictionnaire"""
return {
"compte": self.count,
"journee": len(self.days),
@ -158,6 +178,7 @@ class CountCalculator:
def get_assiduites_stats(
assiduites: Assiduite, metric: str = "all", filtered: dict[str, object] = None
) -> Assiduite:
"""Compte les assiduités en fonction des filtres"""
if filtered is not None:
deb, fin = None, None
@ -298,6 +319,10 @@ def justifies(justi: Justificatif, obj: bool = False) -> list[int]:
def get_all_justified(
justificatifs: Justificatif, date_deb: datetime = None, date_fin: datetime = None
) -> list[Assiduite]:
"""Retourne toutes les assiduités justifiées par les justificatifs donnés"""
# TODO: Forcer le filtrage des assiduités en fonction d'une période
# => Cas d'un justificatif en bordure de période
if date_deb is None:
date_deb = datetime.min
if date_fin is None:

View File

@ -58,7 +58,7 @@ from flask import flash, url_for, make_response, jsonify
from werkzeug.http import HTTP_STATUS_CODES
from config import Config
from app import log, db
from app import log
from app.scodoc.sco_vdi import ApoEtapeVDI
from app.scodoc.codes_cursus import NOTES_TOLERANCE, CODES_EXPL
from app.scodoc import sco_xml
@ -91,7 +91,7 @@ ETATS_INSCRIPTION = {
}
def printProgressBar(
def print_progress_bar(
iteration,
total,
prefix="",
@ -115,28 +115,30 @@ def printProgressBar(
autosize - Optional : Choisir automatiquement la taille de la barre en fonction du terminal (Bool)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
color = ProgressBarColors.RED
color = TerminalColor.RED
if 50 >= float(percent) > 25:
color = ProgressBarColors.MAGENTA
color = TerminalColor.MAGENTA
if 75 >= float(percent) > 50:
color = ProgressBarColors.BLUE
color = TerminalColor.BLUE
if 90 >= float(percent) > 75:
color = ProgressBarColors.CYAN
color = TerminalColor.CYAN
if 100 >= float(percent) > 90:
color = ProgressBarColors.GREEN
color = TerminalColor.GREEN
styling = f"{prefix} |{fill}| {percent}% {suffix}"
if autosize:
cols, _ = get_terminal_size(fallback=(length, 1))
length = cols - len(styling)
filledLength = int(length * iteration // total)
bar = fill * filledLength + "-" * (length - filledLength)
print(f"\r{color}{styling.replace(fill, bar)}{ProgressBarColors.RESET}", end="\r")
filled_length = int(length * iteration // total)
pg_bar = fill * filled_length + "-" * (length - filled_length)
print(f"\r{color}{styling.replace(fill, pg_bar)}{TerminalColor.RESET}", end="\r")
# Affiche une nouvelle ligne vide
if iteration == total:
print(f"\n{finish_msg}")
class ProgressBarColors:
class TerminalColor:
"""Ensemble de couleur pour terminaux"""
BLUE = "\033[94m"
CYAN = "\033[96m"
GREEN = "\033[92m"
@ -153,10 +155,12 @@ class BiDirectionalEnum(Enum):
@classmethod
def contains(cls, attr: str):
"""Vérifie sur un attribut existe dans l'enum"""
return attr.upper() in cls._member_names_
@classmethod
def get(cls, attr: str, default: any = None):
"""Récupère une valeur à partir de son attribut"""
val = None
try:
val = cls[attr.upper()]
@ -206,11 +210,12 @@ def is_iso_formated(date: str, convert=False) -> bool or datetime.datetime or No
try:
date: datetime.datetime = dtparser.isoparse(date)
return date if convert else True
except Exception:
except (dtparser.ParserError, ValueError, TypeError):
return None if convert else False
def localize_datetime(date: datetime.datetime or str) -> datetime.datetime:
"""Ajoute un timecode UTC à la date donnée."""
if isinstance(date, str):
date = is_iso_formated(date, convert=True)

View File

@ -11,7 +11,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "dbcf2175e87f"
down_revision = "5c7b208355df"
down_revision = "d8288b7f0a3e"
branch_labels = None
depends_on = None

View File

@ -34,7 +34,15 @@ COUNT_FIELDS = {"compte": int, "journee": int, "demi": int, "heure": float}
TO_REMOVE = []
def check_fields(data, fields: dict = None):
def check_fields(data: dict, fields: dict = None):
"""
Cette fonction permet de vérifier que le dictionnaire data
contient les bonnes clés et les bons types de valeurs.
Args:
data (dict): un dictionnaire (json de retour de l'api)
fields (dict, optional): Un dictionnaire représentant les clés et les types d'une réponse.
"""
if fields is None:
fields = ASSIDUITES_FIELDS
assert set(data.keys()) == set(fields.keys())
@ -45,7 +53,19 @@ def check_fields(data, fields: dict = None):
assert isinstance(data[key], fields[key])
def check_failure_get(path, headers, err=None):
def check_failure_get(path: str, headers: dict, err: str = None):
"""
Cette fonction vérifiée que la requête GET renvoie bien un 404
Args:
path (str): la route de l'api
headers (dict): le token d'auth de l'api
err (str, optional): L'erreur qui est sensée être fournie par l'api.
Raises:
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
"""
try:
GET(path=path, headers=headers)
# ^ Renvoi un 404
@ -56,7 +76,20 @@ def check_failure_get(path, headers, err=None):
raise APIError("Le GET n'aurait pas du fonctionner")
def check_failure_post(path, headers, data, err=None):
def check_failure_post(path: str, headers: dict, data: dict, err: str = None):
"""
Cette fonction vérifiée que la requête POST renvoie bien un 404
Args:
path (str): la route de l'api
headers (dict): le token d'auth
data (dict): un dictionnaire (json) à envoyer
err (str, optional): L'erreur qui est sensée être fournie par l'api.
Raises:
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
"""
try:
data = POST_JSON(path=path, headers=headers, data=data)
# ^ Renvoi un 404
@ -68,6 +101,18 @@ def check_failure_post(path, headers, data, err=None):
def create_data(etat: str, day: str, module: int = None, desc: str = None):
"""
Permet de créer un dictionnaire assiduité
Args:
etat (str): l'état de l'assiduité (PRESENT,ABSENT,RETARD)
day (str): Le jour de l'assiduité
module (int, optional): Le moduleimpl_id associé
desc (str, optional): Une description de l'assiduité (eg: motif retard )
Returns:
dict: la représentation d'une assiduité
"""
data = {
"date_debut": f"2022-01-{day}T08:00",
"date_fin": f"2022-01-{day}T10:00",
@ -83,6 +128,7 @@ def create_data(etat: str, day: str, module: int = None, desc: str = None):
def test_route_assiduite(api_headers):
"""test de la route /assiduite/<assiduite_id:int>"""
# Bon fonctionnement == id connu
data = GET(path="/assiduite/1", headers=api_headers)
@ -97,6 +143,7 @@ def test_route_assiduite(api_headers):
def test_route_count_assiduites(api_headers):
"""test de la route /assiduites/<etudid:int>/count"""
# Bon fonctionnement
@ -117,6 +164,7 @@ def test_route_count_assiduites(api_headers):
def test_route_assiduites(api_headers):
"""test de la route /assiduites/<etudid:int>"""
# Bon fonctionnement
@ -136,6 +184,7 @@ def test_route_assiduites(api_headers):
def test_route_formsemestre_assiduites(api_headers):
"""test de la route /assiduites/formsemestre/<formsemestre_id:int>"""
# Bon fonctionnement
@ -165,6 +214,7 @@ def test_route_formsemestre_assiduites(api_headers):
def test_route_count_formsemestre_assiduites(api_headers):
"""test de la route /assiduites/formsemestre/<formsemestre_id:int>/count"""
# Bon fonctionnement
@ -193,6 +243,7 @@ def test_route_count_formsemestre_assiduites(api_headers):
def test_route_create(api_headers):
"""test de la route /assiduite/<etudid:int>/create"""
# -== Unique ==-
@ -272,6 +323,7 @@ def test_route_create(api_headers):
def test_route_edit(api_headers):
"""test de la route /assiduite/<assiduite_id:int>/edit"""
# Bon fonctionnement
@ -296,6 +348,7 @@ def test_route_edit(api_headers):
def test_route_delete(api_headers):
"""test de la route /assiduite/delete"""
# -== Unique ==-
# Bon fonctionnement

View File

@ -7,15 +7,15 @@ Ecrit par HARTMANN Matthias
from random import randint
import requests
from tests.api.setup_test_api import (
API_URL,
CHECK_CERTIFICATE,
GET,
POST_JSON,
APIError,
api_headers,
API_URL,
CHECK_CERTIFICATE,
)
import requests
ETUDID = 1
FAUX = 42069
@ -39,6 +39,14 @@ TO_REMOVE = []
def check_fields(data, fields: dict = None):
"""
Cette fonction permet de vérifier que le dictionnaire data
contient les bonnes clés et les bons types de valeurs.
Args:
data (dict): un dictionnaire (json de retour de l'api)
fields (dict, optional): Un dictionnaire représentant les clés et les types d'une réponse.
"""
if fields is None:
fields = JUSTIFICATIFS_FIELDS
assert set(data.keys()) == set(fields.keys())
@ -50,6 +58,17 @@ def check_fields(data, fields: dict = None):
def check_failure_get(path, headers, err=None):
"""
Cette fonction vérifiée que la requête GET renvoie bien un 404
Args:
path (str): la route de l'api
headers (dict): le token d'auth de l'api
err (str, optional): L'erreur qui est sensée être fournie par l'api.
Raises:
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
"""
try:
GET(path=path, headers=headers)
# ^ Renvoi un 404
@ -61,6 +80,18 @@ def check_failure_get(path, headers, err=None):
def check_failure_post(path, headers, data, err=None):
"""
Cette fonction vérifiée que la requête POST renvoie bien un 404
Args:
path (str): la route de l'api
headers (dict): le token d'auth
data (dict): un dictionnaire (json) à envoyer
err (str, optional): L'erreur qui est sensée être fournie par l'api.
Raises:
APIError: Une erreur car la requête a fonctionné (mauvais comportement)
"""
try:
data = POST_JSON(path=path, headers=headers, data=data)
# ^ Renvoi un 404
@ -72,6 +103,17 @@ def check_failure_post(path, headers, data, err=None):
def create_data(etat: str, day: str, raison: str = None):
"""
Permet de créer un dictionnaire assiduité
Args:
etat (str): l'état du justificatif (VALIDE,NON_VALIDE,MODIFIE, ATTENTE)
day (str): Le jour du justificatif
raison (str, optional): Une description du justificatif (eg: motif retard )
Returns:
dict: la représentation d'une assiduité
"""
data = {
"date_debut": f"2022-01-{day}T08:00",
"date_fin": f"2022-01-{day}T10:00",
@ -84,6 +126,7 @@ def create_data(etat: str, day: str, raison: str = None):
def test_route_justificatif(api_headers):
"""test de la route /justificatif/<justif_id:int>"""
# Bon fonctionnement == id connu
data = GET(path="/justificatif/1", headers=api_headers)
@ -98,7 +141,7 @@ def test_route_justificatif(api_headers):
def test_route_justificatifs(api_headers):
"""test de la route /justificatifs/<etudid:int>"""
# Bon fonctionnement
data = GET(path=f"/justificatifs/{ETUDID}", headers=api_headers)
@ -117,7 +160,7 @@ def test_route_justificatifs(api_headers):
def test_route_create(api_headers):
"""test de la route /justificatif/<justif_id:int>/create"""
# -== Unique ==-
# Bon fonctionnement
@ -198,7 +241,7 @@ def test_route_create(api_headers):
def test_route_edit(api_headers):
"""test de la route /justificatif/<justif_id:int>/edit"""
# Bon fonctionnement
data = {"etat": "modifie", "raison": "test"}
@ -209,8 +252,6 @@ def test_route_edit(api_headers):
res = POST_JSON(f"/justificatif/{TO_REMOVE[1]}/edit", data, api_headers)
assert res == {"OK": True}
# TODO: Modification date deb / fin
# Mauvais fonctionnement
check_failure_post(f"/justificatif/{FAUX}/edit", api_headers, data)
@ -224,6 +265,7 @@ def test_route_edit(api_headers):
def test_route_delete(api_headers):
"""test de la route /justificatif/delete"""
# -== Unique ==-
# Bon fonctionnement
@ -273,18 +315,18 @@ def send_file(justif_id: int, filename: str, headers):
Envoi un fichier vers la route d'importation
"""
with open(filename, "rb") as file:
url: str = API_URL + f"/justificatif/import/{justif_id}"
r = requests.post(
url: str = API_URL + f"/justificatif/{justif_id}/import"
req = requests.post(
url,
files={filename: file},
headers=headers,
verify=CHECK_CERTIFICATE,
)
if r.status_code != 200:
raise APIError(f"erreur status={r.status_code} !", r.json())
else:
return r.json()
if req.status_code != 200:
raise APIError(f"erreur status={req.status_code} !", req.json())
return req.json()
def check_failure_send(
@ -293,8 +335,21 @@ def check_failure_send(
filename: str = "tests/api/test_api_justificatif.txt",
err: str = None,
):
"""
Vérifie si l'envoie d'un fichier renvoie bien un 404
Args:
justif_id (int): l'id du justificatif
headers (dict): token d'auth de l'api
filename (str, optional): le chemin vers le fichier.
Defaults to "tests/api/test_api_justificatif.txt".
err (str, optional): l'erreur attendue.
Raises:
APIError: Si l'envoie fonction (mauvais comportement)
"""
try:
data = send_file(justif_id, filename, headers)
send_file(justif_id, filename, headers)
# ^ Renvoi un 404
except APIError as api_err:
if err is not None:
@ -304,6 +359,7 @@ def check_failure_send(
def test_import_justificatif(api_headers):
"""test de la route /justificatif/<justif_id:int>/import"""
# Bon fonctionnement
@ -324,31 +380,45 @@ def test_import_justificatif(api_headers):
def test_list_justificatifs(api_headers):
"""test de la route /justificatif/<justif_id:int>/list"""
# Bon fonctionnement
res: list = GET("/justificatif/list/1", api_headers)
res: list = GET("/justificatif/1/list", api_headers)
assert isinstance(res, list)
assert len(res) == 2
res: list = GET("/justificatif/list/2", api_headers)
res: list = GET("/justificatif/2/list", api_headers)
assert isinstance(res, list)
assert len(res) == 0
# Mauvais fonctionnement
check_failure_get(f"/justificatif/list/{FAUX}", api_headers)
check_failure_get(f"/justificatif/{FAUX}/list", api_headers)
def post_export(id: int, fname: str, api_headers):
url: str = API_URL + f"/justificatif/export/{id}/{fname}"
def post_export(justif_id: int, fname: str, api_headers):
"""
Envoie une requête poste sans data et la retourne
Args:
id (int): justif_id
fname (str): nom du fichier (coté serv)
api_headers (dict): token auth de l'api
Returns:
request: la réponse de l'api
"""
url: str = API_URL + f"/justificatif/{justif_id}/export/{fname}"
res = requests.post(url, headers=api_headers)
return res
def test_export(api_headers):
"""test de la route /justificatif/<justif_id:int>/export/<filename:str>"""
# Bon fonctionnement
assert post_export(1, "test_api_justificatif.txt", api_headers).status_code == 200
@ -362,6 +432,7 @@ def test_export(api_headers):
def test_remove_justificatif(api_headers):
"""test de la route /justificatif/<justif_id:int>/remove"""
# Bon fonctionnement
@ -370,39 +441,41 @@ def test_remove_justificatif(api_headers):
filename: str = "tests/api/test_api_justificatif2.txt"
send_file(2, filename, api_headers)
res: dict = POST_JSON("/justificatif/remove/1", {"remove": "all"}, api_headers)
res: dict = POST_JSON("/justificatif/1/remove", {"remove": "all"}, api_headers)
assert res == {"response": "removed"}
assert len(GET("/justificatif/list/1", api_headers)) == 0
assert len(GET("/justificatif/1/list", api_headers)) == 0
res: dict = POST_JSON(
"/justificatif/remove/2",
"/justificatif/2/remove",
{"remove": "list", "filenames": ["test_api_justificatif2.txt"]},
api_headers,
)
assert res == {"response": "removed"}
assert len(GET("/justificatif/list/2", api_headers)) == 1
assert len(GET("/justificatif/2/list", api_headers)) == 1
res: dict = POST_JSON(
"/justificatif/remove/2",
"/justificatif/2/remove",
{"remove": "list", "filenames": ["test_api_justificatif.txt"]},
api_headers,
)
assert res == {"response": "removed"}
assert len(GET("/justificatif/list/2", api_headers)) == 0
assert len(GET("/justificatif/2/list", api_headers)) == 0
# Mauvais fonctionnement
check_failure_post("/justificatif/remove/2", api_headers, {})
check_failure_post(f"/justificatif/remove/{FAUX}", api_headers, {"remove": "all"})
check_failure_post("/justificatif/remove/1", api_headers, {"remove": "all"})
check_failure_post("/justificatif/2/remove", api_headers, {})
check_failure_post(f"/justificatif/{FAUX}/remove", api_headers, {"remove": "all"})
check_failure_post("/justificatif/1/remove", api_headers, {"remove": "all"})
def test_justified(api_headers):
def test_justifies(api_headers):
"""test la route /justificatif/<justif_id:int>/justifies"""
# Bon fonctionnement
res: list = GET("/justificatif/justified/1", api_headers)
res: list = GET("/justificatif/1/justifies", api_headers)
assert isinstance(res, list)
# Mauvais fonctionnement
check_failure_get(f"/justificatif/justified/{FAUX}", api_headers)
check_failure_get(f"/justificatif/{FAUX}/justifies", api_headers)

View File

@ -26,6 +26,8 @@ from tools import migrate_abs_to_assiduites, downgrade_module
class BiInt(int, scu.BiDirectionalEnum):
"""Classe pour tester la classe BiDirectionalEnum"""
A = 1
B = 2
@ -136,7 +138,7 @@ def test_general(test_client):
etud_faux_dict = g_fake.create_etud(code_nip=None, prenom="etudfaux")
etud_faux = Identite.query.filter_by(id=etud_faux_dict["id"]).first()
verif_migration_abs_assiduites(g_fake)
verif_migration_abs_assiduites()
ajouter_assiduites(etuds, moduleimpls, etud_faux)
justificatifs: list[Justificatif] = ajouter_justificatifs(etuds[0])
@ -148,7 +150,8 @@ def test_general(test_client):
editer_supprimer_justificatif(etuds[0])
def verif_migration_abs_assiduites(g_fake):
def verif_migration_abs_assiduites():
"""Vérification que le script de migration fonctionne correctement"""
downgrade_module(assiduites=True, justificatifs=True)
etudid: int = 1
@ -179,7 +182,7 @@ def verif_migration_abs_assiduites(g_fake):
"02/01/2023",
"10/01/2023",
2,
), # 2 justificatif 02/01: 08h -> 06/01: 18h & justificatif 09/01: 08h -> 10/01: 18h | 14dj
), # 2 justificatif 02/01: 08h -> 06/01: 18h & justificatif 09/01: 08h -> 10/01: 18h | 14dj
(
"19/01/2023",
"19/01/2023",
@ -203,12 +206,14 @@ def verif_migration_abs_assiduites(g_fake):
assert Assiduite.query.count() == 6, "Erreur migration assiduites"
assert Justificatif.query.count() == 4, "Erreur migration justificatifs"
essais_cache(etudid, g_fake)
essais_cache(etudid)
downgrade_module(assiduites=True, justificatifs=True)
def essais_cache(etudid, g_fake):
def essais_cache(etudid):
"""Vérification des fonctionnalités du cache TODO:WIP"""
date_deb: str = "2023-01-01T07:00"
date_fin: str = "2023-03-31T19:00"
@ -231,6 +236,7 @@ def essais_cache(etudid, g_fake):
def ajouter_justificatifs(etud):
"""test de l'ajout des justificatifs"""
obj_justificatifs = [
{
@ -318,7 +324,7 @@ def verifier_filtrage_justificatifs(etud: Identite, justificatifs: list[Justific
), "Filtrage de l'état 'valide' mauvais"
assert (
scass.filter_justificatifs_by_etat(etud.justificatifs, "attente").count() == 1
), f"Filtrage de l'état 'attente' mauvais"
), "Filtrage de l'état 'attente' mauvais"
assert (
scass.filter_justificatifs_by_etat(etud.justificatifs, "modifie").count() == 1
), "Filtrage de l'état 'modifie' mauvais"
@ -404,7 +410,7 @@ def verifier_filtrage_justificatifs(etud: Identite, justificatifs: list[Justific
# Justifications des assiduites
assert len(scass.justifies(justificatifs[2])) == 1, "Justifications mauvais"
assert len(scass.justifies(justificatifs[0])) == 0, f"Justifications mauvais"
assert len(scass.justifies(justificatifs[0])) == 0, "Justifications mauvais"
def editer_supprimer_justificatif(etud: Identite):

View File

@ -1,12 +1,27 @@
"""
Commande permettant de supprimer les assiduités et les justificatifs
Ecrit par Matthias HARTMANN
"""
from app import db
from app.models import Justificatif, Assiduite, Departement
from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
from app.scodoc.sco_utils import ProgressBarColors
from app.scodoc.sco_utils import TerminalColor
def downgrade_module(
dept: str = None, assiduites: bool = False, justificatifs: bool = False
):
"""
Supprime les assiduités et/ou justificatifs du dept sélectionné ou de tous les départements
Args:
dept (str, optional): l'acronym du département. Par défaut tous les départements.
assiduites (bool, optional): suppression des assiduités. Par défaut : Non
justificatifs (bool, optional): supression des justificatifs. Par défaut : Non
"""
dept_etudid: list[int] = None
dept_id: int = None
@ -28,7 +43,7 @@ def downgrade_module(
db.session.commit()
print(
f"{ProgressBarColors.GREEN}Le module assiduité a bien été remis à zero.{ProgressBarColors.RESET}"
f"{TerminalColor.GREEN}Le module assiduité a bien été remis à zero.{TerminalColor.RESET}"
)

View File

@ -1,32 +1,38 @@
# Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
"""
Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
Ecrit par Matthias HARTMANN
"""
from datetime import date, datetime, time, timedelta
from json import dump, dumps
from sqlalchemy import not_
from app import db
from app.profiler import Profiler
from app.models import (
Assiduite,
Justificatif,
Absence,
Identite,
ModuleImplInscription,
Assiduite,
Departement,
Identite,
Justificatif,
ModuleImplInscription,
)
from app.profiler import Profiler
from app.scodoc.sco_utils import (
EtatAssiduite,
EtatJustificatif,
TerminalColor,
localize_datetime,
ProgressBarColors,
printProgressBar,
print_progress_bar,
)
from datetime import time, datetime, date, timedelta
from json import dump, dumps
from sqlalchemy import not_
class _Merger:
""""""
"""pour typage"""
class _glob:
"""variables globales du script"""
PROBLEMS: dict[int, list[str]] = {}
CURRENT_ETU: list = []
MODULES: list[tuple[int, int]] = []
@ -41,42 +47,43 @@ class _glob:
class _Merger:
def __init__(self, abs: Absence, est_abs: bool) -> None:
self.deb = (abs.jour, abs.matin)
self.fin = (abs.jour, abs.matin)
self.moduleimpl = abs.moduleimpl_id
self.etudid = abs.etudid
def __init__(self, abs_: Absence, est_abs: bool) -> None:
self.deb = (abs_.jour, abs_.matin)
self.fin = (abs_.jour, abs_.matin)
self.moduleimpl = abs_.moduleimpl_id
self.etudid = abs_.etudid
self.est_abs = est_abs
self.raison = abs.description
self.raison = abs_.description
def merge(self, abs: Absence) -> bool:
def merge(self, abs_: Absence) -> bool:
"""Fusionne les absences"""
if self.etudid != abs.etudid:
if self.etudid != abs_.etudid:
return False
# Cas d'une même absence enregistrée plusieurs fois
if self.fin == (abs.jour, abs.matin):
if self.fin == (abs_.jour, abs_.matin):
self.moduleimpl = None
else:
if self.fin[1]:
if abs.jour != self.fin[0]:
if abs_.jour != self.fin[0]:
return False
else:
day_after: date = abs.jour - timedelta(days=1) == self.fin[0]
if not (day_after and abs.matin):
day_after: date = abs_.jour - timedelta(days=1) == self.fin[0]
if not (day_after and abs_.matin):
return False
self.fin = (abs.jour, abs.matin)
self.fin = (abs_.jour, abs_.matin)
return True
@staticmethod
def _tuple_to_date(tuple, end=False):
if tuple[1]:
def _tuple_to_date(couple: tuple[date, bool], end=False):
if couple[1]:
time_ = _glob.NOON if end else _glob.MORNING
date_ = datetime.combine(tuple[0], time_)
date_ = datetime.combine(couple[0], time_)
else:
time_ = _glob.EVENING if end else _glob.NOON
date_ = datetime.combine(tuple[0], time_)
date_ = datetime.combine(couple[0], time_)
d = localize_datetime(date_)
return d
@ -108,20 +115,21 @@ class _Merger:
return retour
def export(self):
objects = []
"""Génère un nouvel objet Assiduité ou Justificatif"""
obj: Assiduite or Justificatif = None
if self.est_abs:
_glob.COMPTE[0] += 1
objects.append(self._to_assi())
obj = self._to_assi()
else:
_glob.COMPTE[1] += 1
objects.append(self._to_justif())
obj = self._to_justif()
db.session.add_all(objects)
db.session.add(obj)
class _Statistics:
def __init__(self) -> None:
self.object: dict = {"total": 0}
self.object: dict[str, dict or int] = {"total": 0}
self.year: int = None
def __set_year(self, year: int):
@ -138,41 +146,46 @@ class _Statistics:
self.object[self.year]["etuds_inexistant"].append(etudid)
return self
def __add_abs(self, abs: int, err: str):
if abs not in self.object[self.year]["abs_invalide"]:
self.object[self.year]["abs_invalide"][abs] = [err]
def __add_abs(self, abs_: int, err: str):
if abs_ not in self.object[self.year]["abs_invalide"]:
self.object[self.year]["abs_invalide"][abs_] = [err]
else:
self.object[self.year]["abs_invalide"][abs].append(err)
self.object[self.year]["abs_invalide"][abs_].append(err)
return self
def add_problem(self, abs: Absence, err: str):
abs.jour: date
pivot: date = date(abs.jour.year, 9, 15)
year: int = abs.jour.year
if pivot < abs.jour:
def add_problem(self, abs_: Absence, err: str):
"""Ajoute un nouveau problème dans les statistiques"""
abs_.jour: date
pivot: date = date(abs_.jour.year, 9, 15)
year: int = abs_.jour.year
if pivot < abs_.jour:
year += 1
self.__set_year(year)
if err == "Etudiant inexistant":
self.__add_etud(abs.etudid)
self.__add_etud(abs_.etudid)
else:
self.__add_abs(abs.id, err)
self.__add_abs(abs_.id, err)
self.object["total"] += 1
def compute_stats(self) -> dict:
"""Comptage des statistiques"""
stats: dict = {"total": self.object["total"]}
for year in self.object:
for year, item in self.object.items():
if year == "total":
continue
stats[year] = {}
stats[year]["etuds_inexistant"] = len(self.object[year]["etuds_inexistant"])
stats[year]["abs_invalide"] = len(self.object[year]["abs_invalide"])
stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"])
stats[year]["abs_invalide"] = len(item["abs_invalide"])
return stats
def export(self, file):
"""Sérialise les statistiques dans un fichier"""
dump(self.object, file, indent=2)
@ -242,20 +255,20 @@ def migrate_abs_to_assiduites(
absences_len: int = absences.count()
print(
f"{ProgressBarColors.BLUE}{absences_len} absences vont être migrées{ProgressBarColors.RESET}"
f"{TerminalColor.BLUE}{absences_len} absences vont être migrées{TerminalColor.RESET}"
)
printProgressBar(0, absences_len, "Progression", "effectué", autosize=True)
print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True)
for i, abs in enumerate(absences):
for i, abs_ in enumerate(absences):
try:
_from_abs_to_assiduite_justificatif(abs)
except Exception as e:
stats.add_problem(abs, e.args[0])
_from_abs_to_assiduite_justificatif(abs_)
except ValueError as e:
stats.add_problem(abs_, e.args[0])
if i % 10 == 0:
printProgressBar(
print_progress_bar(
i,
absences_len,
"Progression",
@ -264,7 +277,7 @@ def migrate_abs_to_assiduites(
)
if i % 1000 == 0:
printProgressBar(
print_progress_bar(
i,
absences_len,
"Progression",
@ -278,7 +291,7 @@ def migrate_abs_to_assiduites(
db.session.commit()
printProgressBar(
print_progress_bar(
absences_len,
absences_len,
"Progression",
@ -290,14 +303,14 @@ def migrate_abs_to_assiduites(
statistiques: dict = stats.compute_stats()
print(
f"{ProgressBarColors.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {ProgressBarColors.RESET}"
f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}"
)
print(
f"{ProgressBarColors.RED}{statistiques['total']} absences qui n'ont pas pu être migrées."
f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées."
)
print(
f"Vous retrouverez un fichier json {ProgressBarColors.GREEN}/opt/scodoc-data/log/scodoc_migration_abs.json{ProgressBarColors.RED} contenant les problèmes de migrations"
f"Vous retrouverez un fichier json {TerminalColor.GREEN}/opt/scodoc-data/log/scodoc_migration_abs.json{TerminalColor.RED} contenant les problèmes de migrations"
)
with open(
"/opt/scodoc-data/log/scodoc_migration_abs.json", "w", encoding="utf-8"
@ -305,7 +318,7 @@ def migrate_abs_to_assiduites(
stats.export(file)
print(
f"{ProgressBarColors.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés.{ProgressBarColors.RESET}"
f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés.{TerminalColor.RESET}"
)
print(dumps(statistiques, indent=2))
@ -316,7 +329,7 @@ def _from_abs_to_assiduite_justificatif(_abs: Absence):
if _abs.etudid not in _glob.CURRENT_ETU:
etud: Identite = Identite.query.filter_by(id=_abs.etudid).first()
if etud is None:
raise Exception("Etudiant inexistant")
raise ValueError("Etudiant inexistant")
_glob.CURRENT_ETU.append(_abs.etudid)
if _abs.estabs:
@ -331,7 +344,7 @@ def _from_abs_to_assiduite_justificatif(_abs: Absence):
).first()
)
if moduleimpl_inscription is None:
raise Exception("Moduleimpl_id incorrect ou étudiant non inscrit")
raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit")
if _glob.MERGER_ASSI is None:
_glob.MERGER_ASSI = _Merger(_abs, True)