""" Script de migration des données de la base "absences" -> "assiduites"/"justificatifs" Ecrit par Matthias HARTMANN """ from datetime import date, datetime, time, timedelta from json import dump, dumps from sqlalchemy import not_ from flask import g from app import db from app.models import ( Absence, Assiduite, Departement, Identite, Justificatif, ModuleImplInscription, ) from app.models.config import ScoDocSiteConfig from app.models.assiduites import ( compute_assiduites_justified, ) from app.profiler import Profiler from app.scodoc.sco_utils import ( EtatAssiduite, EtatJustificatif, TerminalColor, localize_datetime, print_progress_bar, ) from app.scodoc import notesdb as ndb class _glob: """variables globales du script""" DEBUG: bool = False PROBLEMS: dict[int, list[str]] = {} DEPT_ETUDIDS: dict[int, Identite] = {} COMPTE: list[int, int] = [] ERR_ETU: list[int] = [] MERGER_ASSI: "_Merger" = None MERGER_JUST: "_Merger" = None MORNING: time = None NOON: time = None EVENING: time = None class _Merger: def __init__(self, abs_: Absence, est_abs: bool) -> None: self.deb = (abs_.jour, abs_.matin) self.fin = (abs_.jour, abs_.matin) self.moduleimpl = abs_.moduleimpl_id self.etudid = abs_.etudid self.est_abs = est_abs self.raison = abs_.description self.entry_date = abs_.entry_date def merge(self, abs_: Absence) -> bool: """Fusionne les absences. Return False si pas de fusion. """ if self.etudid != abs_.etudid: return False # Cas d'une même absence enregistrée plusieurs fois if self.fin == (abs_.jour, abs_.matin): self.moduleimpl = None else: if self.fin[1]: if abs_.jour != self.fin[0]: return False else: day_after: date = abs_.jour - timedelta(days=1) == self.fin[0] if not (day_after and abs_.matin): return False self.fin = (abs_.jour, abs_.matin) return True @staticmethod def _tuple_to_date(couple: tuple[date, bool], end=False): if couple[1]: time_ = _glob.NOON if end else _glob.MORNING date_ = datetime.combine(couple[0], time_) else: time_ = _glob.EVENING if end else _glob.NOON date_ = datetime.combine(couple[0], time_) d = localize_datetime(date_) return d def _to_justif(self): date_deb = _Merger._tuple_to_date(self.deb) date_fin = _Merger._tuple_to_date(self.fin, end=True) _glob.cursor.execute( """INSERT INTO justificatifs (etudid,date_debut,date_fin,etat,raison,entry_date) VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(raison)s,%(entry_date)s) """, { "etudid": self.etudid, "date_debut": date_deb, "date_fin": date_fin, "etat": EtatJustificatif.VALIDE, "raison": self.raison, "entry_date": self.entry_date, }, ) # retour = Justificatif.fast_create_justificatif( # etudid=self.etudid, # date_debut=date_deb, # date_fin=date_fin, # etat=EtatJustificatif.VALIDE, # raison=self.raison, # entry_date=self.entry_date, # ) # return retour def _to_assi(self): date_deb = _Merger._tuple_to_date(self.deb) date_fin = _Merger._tuple_to_date(self.fin, end=True) _glob.cursor.execute( """INSERT INTO assiduites (etudid,date_debut,date_fin,etat,moduleimpl_id,"description",entry_date) VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(moduleimpl_id)s,%(description)s,%(entry_date)s) """, { "etudid": self.etudid, "date_debut": date_deb, "date_fin": date_fin, "etat": EtatAssiduite.ABSENT, "moduleimpl_id": self.moduleimpl, "description": self.raison, "entry_date": self.entry_date, }, ) # retour = Assiduite.fast_create_assiduite( # etudid=self.etudid, # date_debut=date_deb, # date_fin=date_fin, # etat=EtatAssiduite.ABSENT, # moduleimpl_id=self.moduleimpl, # description=self.raison, # entry_date=self.entry_date, # ) # return retour def export(self): """Génère un nouvel objet Assiduité ou Justificatif""" obj: Assiduite or Justificatif = None if self.est_abs: _glob.COMPTE[0] += 1 self._to_assi() else: _glob.COMPTE[1] += 1 self._to_justif() class _Statistics: def __init__(self) -> None: self.object: dict[str, dict or int] = {"total": 0} self.year: int = None def __set_year(self, year: int): if year not in self.object: self.object[year] = { "etuds_inexistant": [], "abs_invalide": {}, } self.year = year return self def __add_etud(self, etudid: int): if etudid not in self.object[self.year]["etuds_inexistant"]: self.object[self.year]["etuds_inexistant"].append(etudid) return self def __add_abs(self, abs_: int, err: str): if abs_ not in self.object[self.year]["abs_invalide"]: self.object[self.year]["abs_invalide"][abs_] = [err] else: self.object[self.year]["abs_invalide"][abs_].append(err) return self def add_problem(self, abs_: Absence, err: str): """Ajoute un nouveau problème dans les statistiques""" abs_.jour: date pivot: date = date(abs_.jour.year, 9, 15) year: int = abs_.jour.year if pivot < abs_.jour: year += 1 self.__set_year(year) if err == "Etudiant inexistant": self.__add_etud(abs_.etudid) else: self.__add_abs(abs_.id, err) self.object["total"] += 1 def compute_stats(self) -> dict: """Comptage des statistiques""" stats: dict = {"total": self.object["total"]} for year, item in self.object.items(): if year == "total": continue stats[year] = {} stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"]) stats[year]["abs_invalide"] = len(item["abs_invalide"]) return stats def export(self, file): """Sérialise les statistiques dans un fichier""" dump(self.object, file, indent=2) def migrate_abs_to_assiduites( dept: str = None, morning: str = None, noon: str = None, evening: str = None, debug: bool = False, ): """ une absence à 3 états: |.estabs|.estjust| |1|0| -> absence non justifiée |1|1| -> absence justifiée |0|1| -> justifié dualité des temps : .matin: bool (0:00 -> time_pref | time_pref->23:59:59) .jour : date (jour de l'absence/justificatif) .moduleimpl_id: relation -> moduleimpl_id description:str -> motif abs / raison justif .entry_date: datetime -> timestamp d'entrée de l'abs .etudid: relation -> Identite """ Profiler.clear() _glob.DEBUG = debug if morning is None: morning = ScoDocSiteConfig.get("assi_morning_time", time(8, 0)) morning: list[str] = str(morning).split(":") _glob.MORNING = time(int(morning[0]), int(morning[1])) if noon is None: noon = ScoDocSiteConfig.get("assi_lunch_time", time(13, 0)) noon: list[str] = str(noon).split(":") _glob.NOON = time(int(noon[0]), int(noon[1])) if evening is None: evening = ScoDocSiteConfig.get("assi_afternoon_time", time(18, 0)) evening: list[str] = str(evening).split(":") _glob.EVENING = time(int(evening[0]), int(evening[1])) ndb.open_db_connection() _glob.cnx = g.db_conn _glob.cursor = _glob.cnx.cursor() if dept is None: prof_total = Profiler("MigrationTotal") prof_total.start() depart: Departement for depart in Departement.query.order_by(Departement.id): migrate_dept( depart.acronym, _Statistics(), Profiler(f"Migration_{depart.acronym}") ) prof_total.stop() print( TerminalColor.GREEN + f"Fin de la migration, elle a durée {prof_total.elapsed():.2f}" + TerminalColor.RESET ) else: migrate_dept(dept, _Statistics(), Profiler("Migration")) def migrate_dept(dept_name: str, stats: _Statistics, time_elapsed: Profiler): time_elapsed.start() absences_query = Absence.query dept: Departement = Departement.query.filter_by(acronym=dept_name).first() if dept is None: raise ValueError(f"Département inexistant: {dept_name}") etuds_id: list[int] = [etud.id for etud in dept.etudiants] absences_query = absences_query.filter(Absence.etudid.in_(etuds_id)) absences: Absence = absences_query.order_by( Absence.etudid, Absence.jour, not_(Absence.matin) ) absences_len: int = absences.count() if absences_len == 0: print( f"{TerminalColor.BLUE}Le département {dept_name} ne possède aucune absence.{TerminalColor.RESET}" ) return _glob.DEPT_ETUDIDS = {e.id for e in Identite.query.filter_by(dept_id=dept.id)} _glob.COMPTE = [0, 0] _glob.ERR_ETU = [] _glob.MERGER_ASSI = None _glob.MERGER_JUST = None print( f"{TerminalColor.BLUE}{absences_len} absences du département {dept_name} vont être migrées{TerminalColor.RESET}" ) print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True) etuds_modimpl_ids = {} for i, abs_ in enumerate(absences): etud_modimpl_ids = etuds_modimpl_ids.get(abs_.etudid) if etud_modimpl_ids is None: etud_modimpl_ids = { ins.moduleimpl_id for ins in ModuleImplInscription.query.filter_by(etudid=abs_.etudid) } etuds_modimpl_ids[abs_.etudid] = etud_modimpl_ids try: _from_abs_to_assiduite_justificatif(abs_, etud_modimpl_ids) except ValueError as e: stats.add_problem(abs_, e.args[0]) if i % 10 == 0: print_progress_bar( i, absences_len, "Progression", "effectué", autosize=True, ) if i % 1000 == 0: print_progress_bar( i, absences_len, "Progression", "effectué", autosize=True, ) _glob.cnx.commit() if _glob.MERGER_ASSI is not None: _glob.MERGER_ASSI.export() if _glob.MERGER_JUST is not None: _glob.MERGER_JUST.export() _glob.cnx.commit() print_progress_bar( absences_len, absences_len, "Progression", "effectué", autosize=True, ) print( TerminalColor.RED + f"Justification des absences du département {dept_name}, veuillez patienter, ceci peut prendre un certain temps." + TerminalColor.RESET ) justifs: Justificatif = Justificatif.query.join(Identite).filter_by(dept_id=dept.id) compute_assiduites_justified(justifs, reset=True) time_elapsed.stop() statistiques: dict = stats.compute_stats() print( f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}" ) filename = f"/opt/scodoc-data/log/{datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}scodoc_migration_abs_{dept_name}.json" if statistiques["total"] > 0: print( f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées." ) print( f"Vous retrouverez un fichier json {TerminalColor.GREEN}{filename}{TerminalColor.RED} contenant les problèmes de migrations" ) with open( filename, "w", encoding="utf-8", ) as file: stats.export(file) print( f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés pour le département {dept_name}.{TerminalColor.RESET}" ) if _glob.DEBUG: print(dumps(statistiques, indent=2)) def _from_abs_to_assiduite_justificatif(_abs: Absence, etud_modimpl_ids: set[int]): if _abs.etudid not in _glob.DEPT_ETUDIDS: raise ValueError("Etudiant inexistant") if _abs.estabs: if (_abs.moduleimpl_id is not None) and ( _abs.moduleimpl_id not in etud_modimpl_ids ): raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit") if _glob.MERGER_ASSI is None: _glob.MERGER_ASSI = _Merger(_abs, True) return True elif _glob.MERGER_ASSI.merge(_abs): return True else: _glob.MERGER_ASSI.export() _glob.MERGER_ASSI = _Merger(_abs, True) return False if _glob.MERGER_JUST is None: _glob.MERGER_JUST = _Merger(_abs, False) return True elif _glob.MERGER_JUST.merge(_abs): return True else: _glob.MERGER_JUST.export() _glob.MERGER_JUST = _Merger(_abs, False) return False