""" Script de migration des données de la base "absences" -> "assiduites"/"justificatifs" Ecrit par Matthias HARTMANN """ from datetime import date, datetime, time, timedelta from json import dump, dumps from sqlalchemy import not_ from flask import g from app import db from app.models import ( Absence, Assiduite, Departement, Identite, Justificatif, ModuleImplInscription, ) from app.models.assiduites import compute_assiduites_justified from app.profiler import Profiler from app.scodoc.sco_utils import ( EtatAssiduite, EtatJustificatif, TerminalColor, localize_datetime, print_progress_bar, ) class _Merger: """pour typage""" class _glob: """variables globales du script""" PROBLEMS: dict[int, list[str]] = {} CURRENT_ETU: list = [] MODULES: list[tuple[int, int]] = [] COMPTE: list[int, int] = [] ERR_ETU: list[int] = [] MERGER_ASSI: _Merger = None MERGER_JUST: _Merger = None MORNING: time = None NOON: time = None EVENING: time = None class _Merger: def __init__(self, abs_: Absence, est_abs: bool) -> None: self.deb = (abs_.jour, abs_.matin) self.fin = (abs_.jour, abs_.matin) self.moduleimpl = abs_.moduleimpl_id self.etudid = abs_.etudid self.est_abs = est_abs self.raison = abs_.description self.entry_date = abs_.entry_date def merge(self, abs_: Absence) -> bool: """Fusionne les absences""" if self.etudid != abs_.etudid: return False # Cas d'une même absence enregistrée plusieurs fois if self.fin == (abs_.jour, abs_.matin): self.moduleimpl = None else: if self.fin[1]: if abs_.jour != self.fin[0]: return False else: day_after: date = abs_.jour - timedelta(days=1) == self.fin[0] if not (day_after and abs_.matin): return False self.fin = (abs_.jour, abs_.matin) return True @staticmethod def _tuple_to_date(couple: tuple[date, bool], end=False): if couple[1]: time_ = _glob.NOON if end else _glob.MORNING date_ = datetime.combine(couple[0], time_) else: time_ = _glob.EVENING if end else _glob.NOON date_ = datetime.combine(couple[0], time_) d = localize_datetime(date_) return d def _to_justif(self): date_deb = _Merger._tuple_to_date(self.deb) date_fin = _Merger._tuple_to_date(self.fin, end=True) retour = Justificatif.fast_create_justificatif( etudid=self.etudid, date_debut=date_deb, date_fin=date_fin, etat=EtatJustificatif.VALIDE, raison=self.raison, entry_date=self.entry_date, ) return retour def _to_assi(self): date_deb = _Merger._tuple_to_date(self.deb) date_fin = _Merger._tuple_to_date(self.fin, end=True) retour = Assiduite.fast_create_assiduite( etudid=self.etudid, date_debut=date_deb, date_fin=date_fin, etat=EtatAssiduite.ABSENT, moduleimpl_id=self.moduleimpl, description=self.raison, entry_date=self.entry_date, ) return retour def export(self): """Génère un nouvel objet Assiduité ou Justificatif""" obj: Assiduite or Justificatif = None if self.est_abs: _glob.COMPTE[0] += 1 obj = self._to_assi() else: _glob.COMPTE[1] += 1 obj = self._to_justif() db.session.add(obj) class _Statistics: def __init__(self) -> None: self.object: dict[str, dict or int] = {"total": 0} self.year: int = None def __set_year(self, year: int): if year not in self.object: self.object[year] = { "etuds_inexistant": [], "abs_invalide": {}, } self.year = year return self def __add_etud(self, etudid: int): if etudid not in self.object[self.year]["etuds_inexistant"]: self.object[self.year]["etuds_inexistant"].append(etudid) return self def __add_abs(self, abs_: int, err: str): if abs_ not in self.object[self.year]["abs_invalide"]: self.object[self.year]["abs_invalide"][abs_] = [err] else: self.object[self.year]["abs_invalide"][abs_].append(err) return self def add_problem(self, abs_: Absence, err: str): """Ajoute un nouveau problème dans les statistiques""" abs_.jour: date pivot: date = date(abs_.jour.year, 9, 15) year: int = abs_.jour.year if pivot < abs_.jour: year += 1 self.__set_year(year) if err == "Etudiant inexistant": self.__add_etud(abs_.etudid) else: self.__add_abs(abs_.id, err) self.object["total"] += 1 def compute_stats(self) -> dict: """Comptage des statistiques""" stats: dict = {"total": self.object["total"]} for year, item in self.object.items(): if year == "total": continue stats[year] = {} stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"]) stats[year]["abs_invalide"] = len(item["abs_invalide"]) return stats def export(self, file): """Sérialise les statistiques dans un fichier""" dump(self.object, file, indent=2) def migrate_abs_to_assiduites( dept: str = "", morning: str = None, noon: str = None, evening: str = None ): """ une absence à 3 états: |.estabs|.estjust| |1|0| -> absence non justifiée |1|1| -> absence justifiée |0|1| -> justifié dualité des temps : .matin: bool (0:00 -> time_pref | time_pref->23:59:59) .jour : date (jour de l'absence/justificatif) .moduleimpl_id: relation -> moduleimpl_id description:str -> motif abs / raision justif .entry_date: datetime -> timestamp d'entrée de l'abs .etudid: relation -> Identite """ Profiler.clear() stats: _Statistics = _Statistics() time_elapsed: Profiler = Profiler("migration") time_elapsed.start() if morning is None: _glob.MORNING = time(8, 0) else: morning: list[str] = morning.split("h") _glob.MORNING = time(int(morning[0]), int(morning[1])) if noon is None: _glob.NOON = time(12, 0) else: noon: list[str] = noon.split("h") _glob.NOON = time(int(noon[0]), int(noon[1])) if evening is None: _glob.EVENING = time(18, 0) else: evening: list[str] = evening.split("h") _glob.EVENING = time(int(evening[0]), int(evening[1])) absences_query = Absence.query if dept is not None: dept: Departement = Departement.query.filter_by(acronym=dept).first() if dept is not None: etuds_id: list[int] = [etud.id for etud in dept.etudiants] absences_query = absences_query.filter(Absence.etudid.in_(etuds_id)) absences: Absence = absences_query.order_by( Absence.etudid, Absence.jour, not_(Absence.matin) ) _glob.CURRENT_ETU = [] _glob.MODULES = [] _glob.COMPTE = [0, 0] _glob.ERR_ETU = [] _glob.MERGER_ASSI = None _glob.MERGER_JUST = None absences_len: int = absences.count() print( f"{TerminalColor.BLUE}{absences_len} absences vont être migrées{TerminalColor.RESET}" ) print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True) for i, abs_ in enumerate(absences): try: _from_abs_to_assiduite_justificatif(abs_) except ValueError as e: stats.add_problem(abs_, e.args[0]) if i % 10 == 0: print_progress_bar( i, absences_len, "Progression", "effectué", autosize=True, ) if i % 1000 == 0: print_progress_bar( i, absences_len, "Progression", "effectué", autosize=True, ) db.session.commit() _glob.MERGER_ASSI.export() _glob.MERGER_JUST.export() db.session.commit() justifs: Justificatif = Justificatif.query if dept is not None: justifs.filter(Justificatif.etudid.in_(etuds_id)) compute_assiduites_justified(justifs) print_progress_bar( absences_len, absences_len, "Progression", "effectué", autosize=True, ) time_elapsed.stop() statistiques: dict = stats.compute_stats() print( f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}" ) print( f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées." ) print( f"Vous retrouverez un fichier json {TerminalColor.GREEN}/opt/scodoc-data/log/scodoc_migration_abs.json{TerminalColor.RED} contenant les problèmes de migrations" ) with open( "/opt/scodoc-data/log/scodoc_migration_abs.json", "w", encoding="utf-8" ) as file: stats.export(file) print( f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés.{TerminalColor.RESET}" ) print(dumps(statistiques, indent=2)) def _from_abs_to_assiduite_justificatif(_abs: Absence): if _abs.etudid not in _glob.CURRENT_ETU: etud: Identite = Identite.query.filter_by(id=_abs.etudid).first() if etud is None: raise ValueError("Etudiant inexistant") _glob.CURRENT_ETU.append(_abs.etudid) if _abs.estabs: moduleimpl_id: int = _abs.moduleimpl_id if ( moduleimpl_id is not None and (_abs.etudid, _abs.moduleimpl_id) not in _glob.MODULES ): moduleimpl_inscription: ModuleImplInscription = ( ModuleImplInscription.query.filter_by( moduleimpl_id=_abs.moduleimpl_id, etudid=_abs.etudid ).first() ) if moduleimpl_inscription is None: raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit") if _glob.MERGER_ASSI is None: _glob.MERGER_ASSI = _Merger(_abs, True) return True elif _glob.MERGER_ASSI.merge(_abs): return True else: _glob.MERGER_ASSI.export() _glob.MERGER_ASSI = _Merger(_abs, True) return False if _glob.MERGER_JUST is None: _glob.MERGER_JUST = _Merger(_abs, False) return True elif _glob.MERGER_JUST.merge(_abs): return True else: _glob.MERGER_JUST.export() _glob.MERGER_JUST = _Merger(_abs, False) return False