assiduites : Nouveau comptage + script migration (ajout progresse bar + options)

This commit is contained in:
iziram 2023-02-08 19:48:34 +01:00
parent c11599b64f
commit e18990d804
5 changed files with 337 additions and 110 deletions

View File

@ -5,7 +5,142 @@ from app.models.assiduites import Assiduite, Justificatif
from app.models.etudiants import Identite
from app.models.formsemestre import FormSemestre, FormSemestreInscription
# TOTALK: Réfléchir sur le fractionnement d'une assiduite prolongée
class CountCalculator:
def __init__(
self,
morning: time = time(8, 0),
noon: time = time(12, 0),
after_noon: time = time(14, 00),
evening: time = time(18, 0),
skip_saturday: bool = True,
) -> None:
self.morning: time = morning
self.noon: time = noon
self.after_noon: time = after_noon
self.evening: time = evening
self.skip_saturday: bool = skip_saturday
delta_total: timedelta = datetime.combine(date.min, evening) - datetime.combine(
date.min, morning
)
delta_lunch: timedelta = datetime.combine(
date.min, after_noon
) - datetime.combine(date.min, noon)
self.hour_per_day: float = (delta_total - delta_lunch).total_seconds() / 3600
self.days: list[date] = []
self.half_days: list[tuple[date, bool]] = [] # tuple -> (date, morning:bool)
self.hours: float = 0.0
self.count: int = 0
def add_half_day(self, day: date, is_morning: bool = True):
key: tuple[date, bool] = (day, is_morning)
if key not in self.half_days:
self.half_days.append(key)
def add_day(self, day: date):
if day not in self.days:
self.days.append(day)
def check_in_morning(self, period: tuple[datetime, datetime]) -> bool:
interval_morning: tuple[datetime, datetime] = (
scu.localize_datetime(datetime.combine(period[0].date(), self.morning)),
scu.localize_datetime(datetime.combine(period[0].date(), self.noon)),
)
in_morning: bool = scu.is_period_overlapping(period, interval_morning)
return in_morning
def check_in_evening(self, period: tuple[datetime, datetime]) -> bool:
interval_evening: tuple[datetime, datetime] = (
scu.localize_datetime(datetime.combine(period[0].date(), self.after_noon)),
scu.localize_datetime(datetime.combine(period[0].date(), self.evening)),
)
in_evening: bool = scu.is_period_overlapping(period, interval_evening)
return in_evening
def compute_long_assiduite(self, assi: Assiduite):
pointer_date: date = assi.date_debut.date() + timedelta(days=1)
start_hours: timedelta = assi.date_debut - scu.localize_datetime(
datetime.combine(assi.date_debut, self.morning)
)
finish_hours: timedelta = assi.date_fin - scu.localize_datetime(
datetime.combine(assi.date_fin, self.morning)
)
self.add_day(assi.date_debut.date())
self.add_day(assi.date_fin.date())
start_period: tuple[datetime, datetime] = (
assi.date_debut,
scu.localize_datetime(
datetime.combine(assi.date_debut.date(), self.evening)
),
)
finish_period: tuple[datetime, datetime] = (
scu.localize_datetime(datetime.combine(assi.date_fin.date(), self.morning)),
assi.date_fin,
)
hours = 0.0
for period in (start_period, finish_period):
if self.check_in_evening(period):
self.add_half_day(period[0].date(), False)
if self.check_in_morning(period):
self.add_half_day(period[0].date())
while pointer_date < assi.date_fin.date():
if pointer_date.weekday() < (6 - self.skip_saturday):
self.add_day(pointer_date)
self.add_half_day(pointer_date)
self.add_half_day(pointer_date, False)
self.hours += self.hour_per_day
hours += self.hour_per_day
pointer_date += timedelta(days=1)
self.hours += finish_hours.total_seconds() / 3600
self.hours += self.hour_per_day - (start_hours.total_seconds() / 3600)
def compute_assiduites(self, assiduites: Assiduite):
assi: Assiduite
for assi in assiduites.all():
self.count += 1
delta: timedelta = assi.date_fin - assi.date_debut
if delta.days > 0:
# raise Exception(self.hours)
self.compute_long_assiduite(assi)
continue
period: tuple[datetime, datetime] = (assi.date_debut, assi.date_fin)
deb_date: date = assi.date_debut.date()
if self.check_in_morning(period):
self.add_half_day(deb_date)
if self.check_in_evening(period):
self.add_half_day(deb_date, False)
self.add_day(deb_date)
self.hours += delta.total_seconds() / 3600
def to_dict(self) -> dict[str, object]:
return {
"compte": self.count,
"journee": len(self.days),
"demi": len(self.half_days),
"heure": round(self.hours, 2),
}
def get_assiduites_stats(
@ -27,7 +162,10 @@ def get_assiduites_stats(
assiduites = filter_by_formsemestre(assiduites, filtered[key])
if (deb, fin) != (None, None):
assiduites = filter_by_date(assiduites, Assiduite, deb, fin)
count: dict = get_count(assiduites)
calculator: CountCalculator = CountCalculator()
calculator.compute_assiduites(assiduites)
count: dict = calculator.to_dict()
metrics: list[str] = metric.split(",")
@ -39,104 +177,104 @@ def get_assiduites_stats(
return output if output else count
def big_counter(
interval: tuple[datetime],
pref_time: time = time(12, 0),
):
curr_date: datetime
# def big_counter(
# interval: tuple[datetime],
# pref_time: time = time(12, 0),
# ):
# curr_date: datetime
if interval[0].time() >= pref_time:
curr_date = scu.localize_datetime(
datetime.combine(interval[0].date(), pref_time)
)
else:
curr_date = scu.localize_datetime(
datetime.combine(interval[0].date(), time(0, 0))
)
# if interval[0].time() >= pref_time:
# curr_date = scu.localize_datetime(
# datetime.combine(interval[0].date(), pref_time)
# )
# else:
# curr_date = scu.localize_datetime(
# datetime.combine(interval[0].date(), time(0, 0))
# )
def next_(curr: datetime, journee):
if curr.time() != pref_time:
next_time = scu.localize_datetime(datetime.combine(curr.date(), pref_time))
else:
next_time = scu.localize_datetime(
datetime.combine(curr.date() + timedelta(days=1), time(0, 0))
)
journee += 1
return next_time, journee
# def next_(curr: datetime, journee):
# if curr.time() != pref_time:
# next_time = scu.localize_datetime(datetime.combine(curr.date(), pref_time))
# else:
# next_time = scu.localize_datetime(
# datetime.combine(curr.date() + timedelta(days=1), time(0, 0))
# )
# journee += 1
# return next_time, journee
demi: int = 0
j: int = 0
while curr_date <= interval[1]:
next_time: datetime
next_time, j = next_(curr_date, j)
if scu.is_period_overlapping((curr_date, next_time), interval, True):
demi += 1
curr_date = next_time
# demi: int = 0
# j: int = 0
# while curr_date <= interval[1]:
# next_time: datetime
# next_time, j = next_(curr_date, j)
# if scu.is_period_overlapping((curr_date, next_time), interval, True):
# demi += 1
# curr_date = next_time
delta: timedelta = interval[1] - interval[0]
heures: float = delta.total_seconds() / 3600
# delta: timedelta = interval[1] - interval[0]
# heures: float = delta.total_seconds() / 3600
if delta.days >= 1:
heures -= delta.days * 16
# if delta.days >= 1:
# heures -= delta.days * 16
return (demi, j, heures)
# return (demi, j, heures)
def get_count(
assiduites: Assiduite, noon: time = time(hour=12)
) -> dict[str, int or float]:
"""Fonction permettant de compter les assiduites
-> seul "compte" est correcte lorsque les assiduites viennent de plusieurs étudiants
"""
# TODO: Comptage demi journée / journée d'assiduité longue
output: dict[str, int or float] = {}
compte: int = assiduites.count()
heure: float = 0.0
journee: int = 0
demi: int = 0
# def get_count(
# assiduites: Assiduite, noon: time = time(hour=12)
# ) -> dict[str, int or float]:
# """Fonction permettant de compter les assiduites
# -> seul "compte" est correcte lorsque les assiduites viennent de plusieurs étudiants
# """
# # TODO: Comptage demi journée / journée d'assiduité longue
# output: dict[str, int or float] = {}
# compte: int = assiduites.count()
# heure: float = 0.0
# journee: int = 0
# demi: int = 0
all_assiduites: list[Assiduite] = assiduites.order_by(Assiduite.date_debut).all()
# all_assiduites: list[Assiduite] = assiduites.order_by(Assiduite.date_debut).all()
current_day: date = None
current_time: str = None
# current_day: date = None
# current_time: str = None
midnight: time = time(hour=0)
# midnight: time = time(hour=0)
def time_check(dtime):
return midnight <= dtime.time() <= noon
# def time_check(dtime):
# return midnight <= dtime.time() <= noon
for ass in all_assiduites:
delta: timedelta = ass.date_fin - ass.date_debut
# for ass in all_assiduites:
# delta: timedelta = ass.date_fin - ass.date_debut
if delta.days > 0:
# if delta.days > 0:
computed_values: tuple[int, int, float] = big_counter(
(ass.date_debut, ass.date_fin), noon
)
# computed_values: tuple[int, int, float] = big_counter(
# (ass.date_debut, ass.date_fin), noon
# )
demi += computed_values[0] - 1
journee += computed_values[1] - 1
heure += computed_values[2]
# demi += computed_values[0] - 1
# journee += computed_values[1] - 1
# heure += computed_values[2]
current_day = ass.date_fin.date()
continue
# current_day = ass.date_fin.date()
# continue
heure += delta.total_seconds() / 3600
# heure += delta.total_seconds() / 3600
ass_time: str = time_check(ass.date_debut)
# ass_time: str = time_check(ass.date_debut)
if current_day != ass.date_debut.date():
current_day = ass.date_debut.date()
current_time = ass_time
demi += 1
journee += 1
# if current_day != ass.date_debut.date():
# current_day = ass.date_debut.date()
# current_time = ass_time
# demi += 1
# journee += 1
if current_time != ass_time:
current_time = ass_time
demi += 1
# if current_time != ass_time:
# current_time = ass_time
# demi += 1
heure = round(heure, 2)
return {"compte": compte, "journee": journee, "heure": heure, "demi": demi}
# heure = round(heure, 2)
# return {"compte": compte, "journee": journee, "heure": heure, "demi": demi}
def filter_assiduites_by_etat(assiduites: Assiduite, etat: str) -> Assiduite:

View File

@ -159,15 +159,11 @@ def localize_datetime(date: datetime.datetime or str) -> datetime.datetime:
new_date: datetime.datetime = date
if date is not None and date.tzinfo is None:
from app.models.assiduites import Assiduite
first_assiduite = Assiduite.query.first()
if first_assiduite is not None:
new_date = date.replace(tzinfo=first_assiduite.date_debut.tzinfo)
else:
# TOTALK: Paramètre permettant d'avoir l'UTC par défaut
tmp = is_iso_formated("2022-01-01T08:00:00+01:00", True)
new_date = date.replace(tzinfo=tmp.tzinfo)
# TOTALK: Paramètre scodoc pour avoir la timezone du serveur/ timezone paramétrée
time_zone: datetime.timezone = datetime.timezone(
datetime.timedelta(seconds=3600), "default"
)
new_date = date.replace(tzinfo=time_zone)
return new_date

View File

@ -471,15 +471,35 @@ def migrate_scodoc7_dept_archives(dept: str): # migrate-scodoc7-dept-archives
@app.cli.command()
@click.argument("dept", default="")
@click.argument("morning", default="")
@click.argument("noon", default="")
@click.argument("evening", default="")
@click.option(
"-d", "--dept", help="Restreint la migration au dept sélectionné (ACRONYME)"
)
@click.option(
"-m",
"--morning",
help="Spécifie l'heure de début des cours format `hh:mm`",
default="08h00",
show_default=True,
)
@click.option(
"-n",
"--noon",
help="Spécifie l'heure de fin du matin (et donc début de l'après-midi) format `hh:mm`",
default="12h00",
show_default=True,
)
@click.option(
"-e",
"--evening",
help="Spécifie l'heure de fin des cours format `hh:mm`",
default="18h00",
show_default=True,
)
@with_appcontext
def migrate_abs_to_assiduites(
dept: str = "", morning: str = "", noon: str = "", evening: str = ""
): # migrate-scodoc7-dept-archives
"""Post-migration: renomme les archives en fonction des id de ScoDoc 9"""
dept: str = None, morning: str = None, noon: str = None, evening: str = None
): # migrate-abs-to-assiduites
"""Permet de migrer les absences vers le nouveau module d'assiduités"""
tools.migrate_abs_to_assiduites(dept, morning, noon, evening)

View File

@ -449,7 +449,7 @@ def ajouter_assiduites(
{
"etat": scu.EtatAssiduite.RETARD,
"deb": "2022-11-04T11:00:01+01:00",
"fin": "2022-12-04T12:00+01:00",
"fin": "2022-12-05T12:00+01:00",
"moduleimpl": None,
"desc": "Description",
},
@ -528,12 +528,14 @@ def verifier_comptage_et_filtrage_assiduites(
assert comptage["compte"] == 6 + 1, "la métrique 'Comptage' n'est pas bien calculée"
assert (
comptage["journee"] == 3 + 30
comptage["journee"] == 3 + 22
), "la métrique 'Journée' n'est pas bien calculée"
assert (
comptage["demi"] == 4 + 60
comptage["demi"] == 4 + 43
), "la métrique 'Demi-Journée' n'est pas bien calculée"
assert comptage["heure"] == 8 + 241, "la métrique 'Heure' n'est pas bien calculée"
assert comptage["heure"] == float(
8 + 169
), "la métrique 'Heure' n'est pas bien calculée"
# Vérification du filtrage classique

View File

@ -1,4 +1,5 @@
# Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
import shutil
from app import db
@ -19,6 +20,60 @@ class glob:
DUPLICATED: list[Justificatif] = []
class bcolors:
BLUE = "\033[94m"
CYAN = "\033[96m"
GREEN = "\033[92m"
MAGENTA = "\033[95m"
RED = "\033[91m"
RESET = "\033[0m"
def printProgressBar(
iteration,
total,
prefix="",
suffix="",
finish_msg="",
decimals=1,
length=100,
fill="",
autosize=False,
):
"""
Affiche une progress bar à un point donné (mettre dans une boucle pour rendre dynamique)
@params:
iteration - Required : index du point donné (Int)
total - Required : nombre total avant complétion (eg: len(List))
prefix - Optional : Préfix -> écrit à gauche de la barre (Str)
suffix - Optional : Suffix -> écrit à droite de la barre (Str)
decimals - Optional : nombres de chiffres après la virgule (Int)
length - Optional : taille de la barre en nombre de caractères (Int)
fill - Optional : charactère de remplissange de la barre (Str)
autosize - Optional : Choisir automatiquement la taille de la barre en fonction du terminal (Bool)
"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
color = bcolors.RED
if 50 > float(percent) > 25:
color = bcolors.MAGENTA
if 75 > float(percent) > 50:
color = bcolors.BLUE
if 90 > float(percent) > 75:
color = bcolors.CYAN
if 100 >= float(percent) > 90:
color = bcolors.GREEN
styling = f"{prefix} |{fill}| {percent}% {suffix}"
if autosize:
cols, _ = shutil.get_terminal_size(fallback=(length, 1))
length = cols - len(styling)
filledLength = int(length * iteration // total)
bar = fill * filledLength + "-" * (length - filledLength)
print(f"\r{color}{styling.replace(fill, bar)}{bcolors.RESET}", end="\r")
# Affiche une nouvelle ligne vide
if iteration == total:
print(f"\n{finish_msg}")
def migrate_abs_to_assiduites(
dept: str = "", morning: str = None, noon: str = None, evening: str = None
):
@ -40,26 +95,26 @@ def migrate_abs_to_assiduites(
.entry_date: datetime -> timestamp d'entrée de l'abs
.etudid: relation -> Identite
"""
if morning == "":
if morning is None:
pref_time_morning = time(8, 0)
else:
morning: list[str] = morning.split("h")
pref_time_morning = time(int(morning[0]), int(morning[1]))
if noon == "":
if noon is None:
pref_time_noon = time(12, 0)
else:
noon: list[str] = noon.split("h")
pref_time_noon = time(int(noon[0]), int(noon[1]))
if evening == "":
if evening is None:
pref_time_evening = time(18, 0)
else:
evening: list[str] = evening.split("h")
pref_time_evening = time(int(evening[0]), int(evening[1]))
absences_query = Absence.query
if dept != "":
if dept is not None:
depts_id = [dep.id for dep in Departement.query.filter_by(acronym=dept).all()]
absences_query = absences_query.filter(Absence.etudid.in_(depts_id))
absences: list[Absence] = absences_query.order_by(Absence.jour).all()
@ -67,26 +122,33 @@ def migrate_abs_to_assiduites(
glob.DUPLICATED = []
glob.DUPLICATIONS_ASSIDUITES = {}
for abs in absences:
print(f"\n== {abs.jour}:{abs.etudid}:{abs.matin} ==")
absences_len: int = len(absences)
printProgressBar(0, absences_len, "Progression", "effectué", autosize=True)
for i, abs in enumerate(absences):
if abs.estabs:
generated = _from_abs_to_assiduite(
abs, pref_time_morning, pref_time_noon, pref_time_evening
)
if not isinstance(generated, str):
db.session.add(generated)
print(
f"{abs.jour}:absence:{abs.etudid}:{abs.matin} -> {generated.date_debut}:{generated.date_fin}"
)
if abs.estjust:
generated = _from_abs_to_justificatif(
abs, pref_time_morning, pref_time_noon, pref_time_evening
)
if not isinstance(generated, str):
db.session.add(generated)
print(
f"{abs.jour}:justif:{abs.etudid}:{abs.matin} -> {generated.date_debut}:{generated.date_fin}"
)
printProgressBar(
i,
absences_len,
"Progression",
"effectué",
autosize=True,
)
dup_assi = glob.DUPLICATED
assi: Assiduite
@ -96,6 +158,15 @@ def migrate_abs_to_assiduites(
db.session.commit()
printProgressBar(
absences_len,
absences_len,
"Progression",
"effectué",
autosize=True,
finish_msg=f"{bcolors.GREEN}Les absences ont bien été migrées.{bcolors.RESET}",
)
def _from_abs_to_assiduite(
_abs: Absence, morning: time, noon: time, evening: time