""" Ecrit par Matthias Hartmann. """ from datetime import date, datetime, time, timedelta from pytz import UTC from flask_sqlalchemy.query import Query from app import log, db import app.scodoc.sco_utils as scu from app.models.assiduites import Assiduite, Justificatif, compute_assiduites_justified from app.models.etudiants import Identite from app.models.formsemestre import FormSemestre, FormSemestreInscription from app.scodoc import sco_formsemestre_inscriptions from app.scodoc import sco_preferences from app.scodoc import sco_cache from app.scodoc import sco_etud from app.models import ScoDocSiteConfig from flask import g class CountCalculator: """ La classe CountCalculator est conçue pour gérer le comptage des assiduités, en calculant le nombre total de jours complets, de demi-journées, et d'heures passées sur une période donnée. Elle prend en compte les jours non travaillés, les horaires de travail standard et les assiduités s'étendant sur plusieurs jours. Utilisation : ------------ 1. Initialisation : La classe peut être initialisée avec des horaires personnalisés pour le matin, le midi et le soir, ainsi qu'une durée de pause déjeuner. Si non spécifiés, les valeurs par défaut seront chargées depuis la configuration `ScoDocSiteConfig`. Exemple d'initialisation : calculator = CountCalculator(morning="08:00", noon="13:00", evening="18:00", nb_heures_par_jour=8) 2. Ajout d'assiduités : Exemple d'ajout d'assiduité : - calculator.compute_assiduites(etudiant.assiduites) - calculator.compute_assiduites([, , , ]) 3. Accès aux métriques : Après l'ajout des assiduités, on peut accéder aux métriques telles que : le nombre total de jours, de demi-journées et d'heures calculées. Exemple d'accès aux métriques : metrics = calculator.to_dict() 4.Réinitialisation du comptage: Si besoin on peut réinitialisé le compteur sans perdre la configuration (horaires personnalisés) Exemple de réinitialisation : calculator.reset() Méthodes Principales : --------------------- - reset() : Réinitialise les compteurs de la classe. - add_half_day(day: date, is_morning: bool) : Ajoute une demi-journée au comptage. - add_day(day: date) : Ajoute un jour complet au comptage. - compute_long_assiduite(assi: Assiduite) : Traite les assiduités s'étendant sur plus d'un jour. - compute_assiduites(assiduites: Query | list) : Calcule les métriques pour une collection d'assiduités. - to_dict() : Retourne les métriques sous forme de dictionnaire. Notes : ------ Détails des calculs des heures: Pour les assiduités courtes (<= 1 jour): heures = assi.deb - assi.fin Pour les assiduités longues (> 1 jour): heures = heures(assi.deb => fin_journee) nb_heure_par_jour * (nb_jours-2) + heures(assi.fin => fin_journee) """ def __init__( self, morning: str = None, noon: str = None, evening: str = None, nb_heures_par_jour: int = None, ) -> None: # Transformation d'une heure "HH:MM" en time(h,m) STR_TIME = lambda x: time(*list(map(int, x.split(":")))) self.morning: time = STR_TIME( morning if morning else ScoDocSiteConfig.get("assi_morning_time", "08:00") ) # Date pivot pour déterminer les demi-journées self.noon: time = STR_TIME( noon if noon else ScoDocSiteConfig.get("assi_lunch_time", "13:00") ) self.evening: time = STR_TIME( evening if evening else ScoDocSiteConfig.get("assi_afternoon_time", "18:00") ) self.non_work_days: list[ scu.NonWorkDays ] = scu.NonWorkDays.get_all_non_work_days(dept_id=g.scodoc_dept_id) delta_total: timedelta = datetime.combine( date.min, self.evening ) - datetime.combine(date.min, self.morning) # Sera utilisé pour les assiduités longues (> 1 journée) self.nb_heures_par_jour = ( nb_heures_par_jour if nb_heures_par_jour else sco_preferences.get_preference( "nb_heures_par_jour", dept_id=g.scodoc_dept_id ) ) self.data = {} self.reset() def reset(self): """Remet à zero les compteurs""" self.data = { "total": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "absent": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "absent_just": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "absent_non_just": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "retard": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "retard_just": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "retard_non_just": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, "present": { "journee": [], "demi": [], "heure": 0, "compte": 0, }, } def get_count_key(self, etat: scu.EtatAssiduite, justi: bool = False) -> str: """Récupère une clé de dictionnaire en fonction de l'état de l'assiduité et si elle est justifié """ keys: dict[EtatAssiduite, str] = { scu.EtatAssiduite.ABSENT: "absent", scu.EtatAssiduite.RETARD: "retard", scu.EtatAssiduite.PRESENT: "present", } count_key: str = keys.get(etat) if etat != scu.EtatAssiduite.PRESENT: count_key += "_just" if justi else "_non_just" return count_key def add_half_day(self, day: date, assi: Assiduite, is_morning: bool = True): """Ajoute une demi-journée dans le comptage""" key: tuple[date, bool] = (day, is_morning) count_key: str = self.get_count_key(assi.etat, assi.est_just) if assi.etat != scu.EtatAssiduite.PRESENT: _key: str = scu.EtatAssiduite.inverse().get(assi.etat).name.lower() if key not in self.data[_key]["demi"]: self.data[_key]["demi"].append(key) if key not in self.data["total"]["demi"]: self.data["total"]["demi"].append(key) if key not in self.data[count_key]["demi"]: self.data[count_key]["demi"].append(key) def add_day(self, day: date, assi: Assiduite): """Ajoute un jour dans le comptage""" count_key: str = self.get_count_key(assi.etat, assi.est_just) if assi.etat != scu.EtatAssiduite.PRESENT: key: str = scu.EtatAssiduite.inverse().get(assi.etat).name.lower() if day not in self.data[key]["journee"]: self.data[key]["journee"].append(day) if day not in self.data["total"]["journee"]: self.data["total"]["journee"].append(day) if day not in self.data[count_key]["journee"]: self.data[count_key]["journee"].append(day) def add_hours(self, hours: float, assi: Assiduite): """Ajoute des heures dans le comptage""" count_key: str = self.get_count_key(assi.etat, assi.est_just) if assi.etat != scu.EtatAssiduite.PRESENT: self.data[scu.EtatAssiduite.inverse().get(assi.etat).name.lower()][ "heure" ] += hours self.data[count_key]["heure"] += hours self.data["total"]["heure"] += hours def add_count(self, assi: Assiduite): """Ajoute 1 count dans le comptage""" count_key: str = self.get_count_key(assi.etat, assi.est_just) if assi.etat != scu.EtatAssiduite.PRESENT: self.data[scu.EtatAssiduite.inverse().get(assi.etat).name.lower()][ "compte" ] += 1 self.data[count_key]["compte"] += 1 self.data["total"]["compte"] += 1 def is_in_morning(self, period: tuple[datetime, datetime]) -> bool: """Vérifiée si la période donnée fait partie du matin (Test sur la date de début) """ interval_morning: tuple[datetime, datetime] = ( scu.localize_datetime(datetime.combine(period[0].date(), self.morning)), scu.localize_datetime(datetime.combine(period[0].date(), self.noon)), ) in_morning: bool = scu.is_period_overlapping( period, interval_morning, bornes=False ) return in_morning def is_in_evening(self, period: tuple[datetime, datetime]) -> bool: """Vérifie si la période fait partie de l'aprèm (test sur la date de début) """ interval_evening: tuple[datetime, datetime] = ( scu.localize_datetime( datetime.combine(period[0].date(), self.noon) + timedelta(seconds=1) ), scu.localize_datetime(datetime.combine(period[0].date(), self.evening)), ) in_evening: bool = scu.is_period_overlapping(period, interval_evening) return in_evening def compute_long_assiduite(self, assi: Assiduite): """Calcule les métriques sur une assiduité longue (plus d'un jour)""" pointer_date: date = assi.date_debut.date() + timedelta(days=1) self.add_day(assi.date_debut.date(), assi) self.add_day(assi.date_fin.date(), assi) start_period: tuple[datetime, datetime] = ( assi.date_debut, scu.localize_datetime( datetime.combine(assi.date_debut.date(), self.evening) ), ) finish_period: tuple[datetime, datetime] = ( scu.localize_datetime(datetime.combine(assi.date_fin.date(), self.morning)), assi.date_fin, ) for period in (start_period, finish_period): if self.is_in_evening(period): self.add_half_day(period[0].date(), assi, False) if self.is_in_morning(period): self.add_half_day(period[0].date(), assi) while pointer_date < assi.date_fin.date(): if pointer_date.weekday() not in self.non_work_days: self.add_day(pointer_date, assi) self.add_half_day(pointer_date, assi) self.add_half_day(pointer_date, assi, False) self.add_hours(self.nb_heures_par_jour, assi) pointer_date += timedelta(days=1) # Gestion des heures des dates de début et des dates de fin deb_hours = (start_period[1] - start_period[0]).total_seconds() / 3600 fin_hours = (finish_period[1] - finish_period[0]).total_seconds() / 3600 self.add_hours(deb_hours + fin_hours, assi) def compute_assiduites(self, assiduites: Query | list): """Calcule les métriques pour la collection d'assiduité donnée""" assi: Assiduite for assi in assiduites: # Ajout vérification workday # (Si préférence mise après avoir déjà noté des assiduités) if assi.date_debut.weekday() in self.non_work_days: continue self.add_count(assi) delta: timedelta = assi.date_fin - assi.date_debut if delta.days > 0: self.compute_long_assiduite(assi) continue period: tuple[datetime, datetime] = (assi.date_debut, assi.date_fin) deb_date: date = assi.date_debut.date() if self.is_in_morning(period): self.add_half_day(deb_date, assi) if self.is_in_evening(period): self.add_half_day(deb_date, assi, False) self.add_day(deb_date, assi) self.add_hours(delta.total_seconds() / 3600, assi) self.setup_data() def setup_data(self): """Met en forme les données pour les journées et les demi-journées : au lieu d'avoir list[str] on a le nombre (len(list[str])) """ for key in self.data: self.data[key]["journee"] = len(self.data[key]["journee"]) self.data[key]["demi"] = len(self.data[key]["demi"]) def to_dict(self, only_total: bool = True) -> dict[str, int | float]: """Retourne les métriques sous la forme d'un dictionnaire""" return self.data["total"] if only_total else self.data def get_assiduites_stats( assiduites: Query, metric: str = "all", filtered: dict[str, object] = None ) -> dict[str, int | float]: """Compte les assiduités en fonction des filtres""" if filtered is not None: deb, fin = None, None for key in filtered: match key: case "etat": assiduites = filter_assiduites_by_etat(assiduites, filtered[key]) case "date_fin": fin = filtered[key] case "date_debut": deb = filtered[key] case "moduleimpl_id": assiduites = filter_by_module_impl(assiduites, filtered[key]) case "formsemestre": assiduites = filter_by_formsemestre( assiduites, Assiduite, filtered[key] ) case "est_just": assiduites = filter_assiduites_by_est_just( assiduites, filtered[key] ) case "user_id": assiduites = filter_by_user_id(assiduites, filtered[key]) if (deb, fin) != (None, None): assiduites = filter_by_date(assiduites, Assiduite, deb, fin) metrics: list[str] = metric.split(",") output: dict = {} calculator: CountCalculator = CountCalculator() calculator.compute_assiduites(assiduites) if filtered is None or "split" not in filtered: count: dict = calculator.to_dict(only_total=True) for key, val in count.items(): if key in metrics: output[key] = val return output if output else count # Récupération des états etats: list[str] = ( filtered["etat"].split(",") if "etat" in filtered else ["absent", "present", "retard"] ) # Préparation du dictionnaire de retour avec les valeurs du calcul count: dict = calculator.to_dict(only_total=False) for etat in etats: if etat != "present": output[etat] = count[etat] output[etat]["justifie"] = count[etat + "_just"] output[etat]["non_justifie"] = count[etat + "_non_just"] else: output[etat] = count[etat] output["total"] = count["total"] return output def filter_assiduites_by_etat(assiduites: Assiduite, etat: str) -> Query: """ Filtrage d'une collection d'assiduites en fonction de leur état """ etats: list[str] = list(etat.split(",")) etats = [scu.EtatAssiduite.get(e, -1) for e in etats] return assiduites.filter(Assiduite.etat.in_(etats)) def filter_assiduites_by_est_just(assiduites: Assiduite, est_just: bool) -> Query: """ Filtrage d'une collection d'assiduites en fonction de s'ils sont justifiés """ return assiduites.filter(Assiduite.est_just == est_just) def filter_by_user_id( collection: Assiduite | Justificatif, user_id: int, ) -> Query: """ Filtrage d'une collection en fonction de l'user_id """ return collection.filter_by(user_id=user_id) def filter_by_date( collection: Assiduite | Justificatif, collection_cls: Assiduite | Justificatif, date_deb: datetime = None, date_fin: datetime = None, strict: bool = False, ) -> Query: """ Filtrage d'une collection d'assiduites en fonction d'une date """ if date_deb is None: date_deb = datetime.min if date_fin is None: date_fin = datetime.max date_deb = scu.localize_datetime(date_deb) # TODO A modifier (timezone ?) date_fin = scu.localize_datetime(date_fin) if not strict: return collection.filter( collection_cls.date_debut <= date_fin, collection_cls.date_fin >= date_deb ) return collection.filter( collection_cls.date_debut < date_fin, collection_cls.date_fin > date_deb ) def filter_justificatifs_by_etat(justificatifs: Query, etat: str) -> Query: """ Filtrage d'une collection de justificatifs en fonction de leur état """ etats: list[str] = list(etat.split(",")) etats = [scu.EtatJustificatif.get(e, -1) for e in etats] return justificatifs.filter(Justificatif.etat.in_(etats)) def filter_by_module_impl(assiduites: Assiduite, module_impl_id: int | None) -> Query: """ Filtrage d'une collection d'assiduites en fonction de l'ID du module_impl """ return assiduites.filter(Assiduite.moduleimpl_id == module_impl_id) def filter_by_formsemestre( collection_query: Assiduite | Justificatif, collection_class: Assiduite | Justificatif, formsemestre: FormSemestre, ) -> Query: """ Filtrage d'une collection en fonction d'un formsemestre """ if formsemestre is None: return collection_query.filter(False) collection_result = ( collection_query.join(Identite, collection_class.etudid == Identite.id) .join( FormSemestreInscription, Identite.id == FormSemestreInscription.etudid, ) .filter(FormSemestreInscription.formsemestre_id == formsemestre.id) ) form_date_debut = formsemestre.date_debut + timedelta(days=1) form_date_fin = formsemestre.date_fin + timedelta(days=1) collection_result = collection_result.filter( collection_class.date_debut >= form_date_debut ) return collection_result.filter(collection_class.date_fin <= form_date_fin) def justifies(justi: Justificatif, obj: bool = False) -> list[int] | Query: """ Retourne la liste des assiduite_id qui sont justifié par la justification Une assiduité est justifiée si elle est COMPLETEMENT ou PARTIELLEMENT comprise dans la plage du justificatif et que l'état du justificatif est "valide". Renvoie des id si obj == False, sinon les Assiduités """ if justi.etat != scu.EtatJustificatif.VALIDE: return [] assiduites_query: Assiduite = Assiduite.query.filter_by(etudid=justi.etudid) assiduites_query = assiduites_query.filter( Assiduite.date_debut >= justi.date_debut, Assiduite.date_fin <= justi.date_fin ) if not obj: return [assi.id for assi in assiduites_query.all()] return assiduites_query def get_all_justified( etudid: int, date_deb: datetime = None, date_fin: datetime = None, moduleimpl_id: int = None, ) -> Query: """Retourne toutes les assiduités justifiées sur une période""" if date_deb is None: date_deb = datetime.min if date_fin is None: date_fin = datetime.max date_deb = scu.localize_datetime(date_deb) date_fin = scu.localize_datetime(date_fin) justified: Query = Assiduite.query.filter_by(est_just=True, etudid=etudid) if moduleimpl_id is not None: justified = justified.filter_by(moduleimpl_id=moduleimpl_id) after = filter_by_date( justified, Assiduite, date_deb, date_fin, ) return after def create_absence( date_debut: datetime, date_fin: datetime, etudid: int, description: str = None, est_just: bool = False, ) -> int: """TODO: doc, dire quand l'utiliser""" # TODO etud: Identite = Identite.query.filter_by(etudid=etudid).first_or_404() assiduite_unique: Assiduite = Assiduite.create_assiduite( etud=etud, date_debut=date_debut, date_fin=date_fin, etat=scu.EtatAssiduite.ABSENT, description=description, ) db.session.add(assiduite_unique) db.session.commit() if est_just: justi = Justificatif.create_justificatif( etudiant=etud, date_debut=date_debut, date_fin=date_fin, etat=scu.EtatJustificatif.VALIDE, raison=description, ) db.session.add(justi) db.session.commit() compute_assiduites_justified(etud.id, [justi]) calculator: CountCalculator = CountCalculator() calculator.compute_assiduites([assiduite_unique]) return calculator.to_dict()["demi"] # Gestion du cache def get_assiduites_count(etudid: int, sem: dict) -> tuple[int, int]: """Les comptes d'absences de cet étudiant dans ce semestre: tuple (nb abs non justifiées, nb abs justifiées) Utilise un cache. """ metrique = sco_preferences.get_preference("assi_metrique", sem["formsemestre_id"]) return get_assiduites_count_in_interval( etudid, sem["date_debut_iso"], sem["date_fin_iso"], scu.translate_assiduites_metric(metrique), ) def formsemestre_get_assiduites_count( etudid: int, formsemestre: FormSemestre, moduleimpl_id: int = None ) -> tuple[int, int]: """Les comptes d'absences de cet étudiant dans ce semestre: tuple (nb abs non justifiées, nb abs justifiées) Utilise un cache. """ metrique = sco_preferences.get_preference("assi_metrique", formsemestre.id) return get_assiduites_count_in_interval( etudid, date_debut=scu.localize_datetime( datetime.combine(formsemestre.date_debut, time(8, 0)) ), date_fin=scu.localize_datetime( datetime.combine(formsemestre.date_fin, time(18, 0)) ), metrique=scu.translate_assiduites_metric(metrique), moduleimpl_id=moduleimpl_id, ) def get_assiduites_count_in_interval( etudid, date_debut_iso: str = "", date_fin_iso: str = "", metrique="demi", date_debut: datetime = None, date_fin: datetime = None, moduleimpl_id: int = None, ): """Les comptes d'absences de cet étudiant entre ces deux dates, incluses: tuple (nb abs, nb abs justifiées) On peut spécifier les dates comme datetime ou iso. Utilise un cache. """ date_debut_iso = date_debut_iso or date_debut.isoformat() date_fin_iso = date_fin_iso or date_fin.isoformat() # TODO Question: pourquoi ne pas cacher toutes les métriques, si l'API les veut toutes ? key = f"{etudid}_{date_debut_iso}_{date_fin_iso}{metrique}_assiduites" r = sco_cache.AbsSemEtudCache.get(key) if not r or moduleimpl_id is not None: date_debut: datetime = date_debut or datetime.fromisoformat(date_debut_iso) date_fin: datetime = date_fin or datetime.fromisoformat(date_fin_iso) assiduites: Query = Assiduite.query.filter_by(etudid=etudid) assiduites = assiduites.filter(Assiduite.etat == scu.EtatAssiduite.ABSENT) assiduites = filter_by_date(assiduites, Assiduite, date_debut, date_fin) if moduleimpl_id is not None: assiduites = assiduites.filter_by(moduleimpl_id=moduleimpl_id) calculator: CountCalculator = CountCalculator() calculator.compute_assiduites(assiduites) calcul: dict = calculator.to_dict(only_total=False) nb_abs: dict = calcul["absent"][metrique] nb_abs_just: dict = calcul["absent_just"][metrique] r = (nb_abs, nb_abs_just) if moduleimpl_id is None: ans = sco_cache.AbsSemEtudCache.set(key, r) if not ans: log("warning: get_assiduites_count failed to cache") return r def invalidate_assiduites_count(etudid: int, sem: dict): """Invalidate (clear) cached counts""" date_debut = sem["date_debut_iso"] date_fin = sem["date_fin_iso"] for met in scu.AssiduitesMetrics.TAG: key = str(etudid) + "_" + date_debut + "_" + date_fin + f"{met}_assiduites" sco_cache.AbsSemEtudCache.delete(key) # Non utilisé def invalidate_assiduites_count_sem(sem: dict): """Invalidate (clear) cached abs counts for all the students of this semestre""" inscriptions = ( sco_formsemestre_inscriptions.do_formsemestre_inscription_listinscrits( sem["formsemestre_id"] ) ) for ins in inscriptions: invalidate_assiduites_count(ins["etudid"], sem) def invalidate_assiduites_etud_date(etudid: int, the_date: datetime): """Doit etre appelé à chaque modification des assiduites pour cet étudiant et cette date. Invalide cache absence et caches semestre """ from app.scodoc import sco_compute_moy # Semestres a cette date: etud = sco_etud.get_etud_info(etudid=etudid, filled=True) if len(etud) == 0: return else: etud = etud[0] sems = [ sem for sem in etud["sems"] if scu.is_iso_formated(sem["date_debut_iso"], True).replace(tzinfo=UTC) <= the_date.replace(tzinfo=UTC) and scu.is_iso_formated(sem["date_fin_iso"], True).replace(tzinfo=UTC) >= the_date.replace(tzinfo=UTC) ] # Invalide les PDF et les absences: for sem in sems: # Inval cache bulletin et/ou note_table if sco_compute_moy.formsemestre_expressions_use_abscounts( sem["formsemestre_id"] ): # certaines formules utilisent les absences pdfonly = False else: # efface toujours le PDF car il affiche en général les absences pdfonly = True sco_cache.invalidate_formsemestre( formsemestre_id=sem["formsemestre_id"], pdfonly=pdfonly ) # Inval cache compteurs absences: invalidate_assiduites_count(etudid, sem) def simple_invalidate_cache(obj: dict, etudid: str | int = None): """Invalide le cache de l'étudiant et du / des semestres""" date_debut = ( obj["date_debut"] if isinstance(obj["date_debut"], datetime) else scu.is_iso_formated(obj["date_debut"], True) ) date_fin = ( obj["date_fin"] if isinstance(obj["date_fin"], datetime) else scu.is_iso_formated(obj["date_fin"], True) ) etudid = etudid if etudid is not None else obj["etudid"] invalidate_assiduites_etud_date(etudid, date_debut) invalidate_assiduites_etud_date(etudid, date_fin) sco_cache.RequeteTableauAssiduiteCache.delete_with(f"tableau-etud-{etudid}")