1
0
forked from ScoDoc/ScoDoc
ScoDoc/tools/migrate_abs_to_assiduites.py

452 lines
13 KiB
Python

"""
Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
Ecrit par Matthias HARTMANN
"""
from datetime import date, datetime, time, timedelta
from json import dump, dumps
from sqlalchemy import not_
from flask import g
from app import db
from app.models import (
Absence,
Assiduite,
Departement,
Identite,
Justificatif,
ModuleImplInscription,
)
from app.models.config import ScoDocSiteConfig
from app.models.assiduites import (
compute_assiduites_justified,
)
from app.profiler import Profiler
from app.scodoc.sco_utils import (
EtatAssiduite,
EtatJustificatif,
TerminalColor,
localize_datetime,
print_progress_bar,
)
from app.scodoc import notesdb as ndb
class _glob:
"""variables globales du script"""
DEBUG: bool = False
PROBLEMS: dict[int, list[str]] = {}
DEPT_ETUDIDS: dict[int, Identite] = {}
COMPTE: list[int, int] = []
ERR_ETU: list[int] = []
MERGER_ASSI: "_Merger" = None
MERGER_JUST: "_Merger" = None
MORNING: time = None
NOON: time = None
EVENING: time = None
class _Merger:
def __init__(self, abs_: Absence, est_abs: bool) -> None:
self.deb = (abs_.jour, abs_.matin)
self.fin = (abs_.jour, abs_.matin)
self.moduleimpl = abs_.moduleimpl_id
self.etudid = abs_.etudid
self.est_abs = est_abs
self.raison = abs_.description
self.entry_date = abs_.entry_date
def merge(self, abs_: Absence) -> bool:
"""Fusionne les absences.
Return False si pas de fusion.
"""
if self.etudid != abs_.etudid:
return False
# Cas d'une même absence enregistrée plusieurs fois
if self.fin == (abs_.jour, abs_.matin):
self.moduleimpl = None
else:
if self.fin[1]:
if abs_.jour != self.fin[0]:
return False
else:
day_after: date = abs_.jour - timedelta(days=1) == self.fin[0]
if not (day_after and abs_.matin):
return False
self.fin = (abs_.jour, abs_.matin)
return True
@staticmethod
def _tuple_to_date(couple: tuple[date, bool], end=False):
if couple[1]:
time_ = _glob.NOON if end else _glob.MORNING
date_ = datetime.combine(couple[0], time_)
else:
time_ = _glob.EVENING if end else _glob.NOON
date_ = datetime.combine(couple[0], time_)
d = localize_datetime(date_)
return d
def _to_justif(self):
date_deb = _Merger._tuple_to_date(self.deb)
date_fin = _Merger._tuple_to_date(self.fin, end=True)
_glob.cursor.execute(
"""INSERT INTO justificatifs
(etudid,date_debut,date_fin,etat,raison,entry_date)
VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(raison)s,%(entry_date)s)
""",
{
"etudid": self.etudid,
"date_debut": date_deb,
"date_fin": date_fin,
"etat": EtatJustificatif.VALIDE,
"raison": self.raison,
"entry_date": self.entry_date,
},
)
# retour = Justificatif.fast_create_justificatif(
# etudid=self.etudid,
# date_debut=date_deb,
# date_fin=date_fin,
# etat=EtatJustificatif.VALIDE,
# raison=self.raison,
# entry_date=self.entry_date,
# )
# return retour
def _to_assi(self):
date_deb = _Merger._tuple_to_date(self.deb)
date_fin = _Merger._tuple_to_date(self.fin, end=True)
_glob.cursor.execute(
"""INSERT INTO assiduites
(etudid,date_debut,date_fin,etat,moduleimpl_id,"desc",entry_date)
VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(moduleimpl_id)s,%(desc)s,%(entry_date)s)
""",
{
"etudid": self.etudid,
"date_debut": date_deb,
"date_fin": date_fin,
"etat": EtatAssiduite.ABSENT,
"moduleimpl_id": self.moduleimpl,
"desc": self.raison,
"entry_date": self.entry_date,
},
)
# retour = Assiduite.fast_create_assiduite(
# etudid=self.etudid,
# date_debut=date_deb,
# date_fin=date_fin,
# etat=EtatAssiduite.ABSENT,
# moduleimpl_id=self.moduleimpl,
# description=self.raison,
# entry_date=self.entry_date,
# )
# return retour
def export(self):
"""Génère un nouvel objet Assiduité ou Justificatif"""
obj: Assiduite or Justificatif = None
if self.est_abs:
_glob.COMPTE[0] += 1
self._to_assi()
else:
_glob.COMPTE[1] += 1
self._to_justif()
class _Statistics:
def __init__(self) -> None:
self.object: dict[str, dict or int] = {"total": 0}
self.year: int = None
def __set_year(self, year: int):
if year not in self.object:
self.object[year] = {
"etuds_inexistant": [],
"abs_invalide": {},
}
self.year = year
return self
def __add_etud(self, etudid: int):
if etudid not in self.object[self.year]["etuds_inexistant"]:
self.object[self.year]["etuds_inexistant"].append(etudid)
return self
def __add_abs(self, abs_: int, err: str):
if abs_ not in self.object[self.year]["abs_invalide"]:
self.object[self.year]["abs_invalide"][abs_] = [err]
else:
self.object[self.year]["abs_invalide"][abs_].append(err)
return self
def add_problem(self, abs_: Absence, err: str):
"""Ajoute un nouveau problème dans les statistiques"""
abs_.jour: date
pivot: date = date(abs_.jour.year, 9, 15)
year: int = abs_.jour.year
if pivot < abs_.jour:
year += 1
self.__set_year(year)
if err == "Etudiant inexistant":
self.__add_etud(abs_.etudid)
else:
self.__add_abs(abs_.id, err)
self.object["total"] += 1
def compute_stats(self) -> dict:
"""Comptage des statistiques"""
stats: dict = {"total": self.object["total"]}
for year, item in self.object.items():
if year == "total":
continue
stats[year] = {}
stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"])
stats[year]["abs_invalide"] = len(item["abs_invalide"])
return stats
def export(self, file):
"""Sérialise les statistiques dans un fichier"""
dump(self.object, file, indent=2)
def migrate_abs_to_assiduites(
dept: str = None,
morning: str = None,
noon: str = None,
evening: str = None,
debug: bool = False,
):
"""
une absence à 3 états:
|.estabs|.estjust|
|1|0| -> absence non justifiée
|1|1| -> absence justifiée
|0|1| -> justifié
dualité des temps :
.matin: bool (0:00 -> time_pref | time_pref->23:59:59)
.jour : date (jour de l'absence/justificatif)
.moduleimpl_id: relation -> moduleimpl_id
description:str -> motif abs / raison justif
.entry_date: datetime -> timestamp d'entrée de l'abs
.etudid: relation -> Identite
"""
Profiler.clear()
_glob.DEBUG = debug
if morning is None:
morning = ScoDocSiteConfig.get("assi_morning_time", time(8, 0))
morning: list[str] = str(morning).split(":")
_glob.MORNING = time(int(morning[0]), int(morning[1]))
if noon is None:
noon = ScoDocSiteConfig.get("assi_lunch_time", time(13, 0))
noon: list[str] = str(noon).split(":")
_glob.NOON = time(int(noon[0]), int(noon[1]))
if evening is None:
evening = ScoDocSiteConfig.get("assi_afternoon_time", time(18, 0))
evening: list[str] = str(evening).split(":")
_glob.EVENING = time(int(evening[0]), int(evening[1]))
ndb.open_db_connection()
_glob.cnx = g.db_conn
_glob.cursor = _glob.cnx.cursor()
if dept is None:
prof_total = Profiler("MigrationTotal")
prof_total.start()
depart: Departement
for depart in Departement.query.order_by(Departement.id):
migrate_dept(
depart.acronym, _Statistics(), Profiler(f"Migration_{depart.acronym}")
)
prof_total.stop()
print(
TerminalColor.GREEN
+ f"Fin de la migration, elle a durée {prof_total.elapsed():.2f}"
+ TerminalColor.RESET
)
else:
migrate_dept(dept, _Statistics(), Profiler("Migration"))
def migrate_dept(dept_name: str, stats: _Statistics, time_elapsed: Profiler):
time_elapsed.start()
absences_query = Absence.query
dept: Departement = Departement.query.filter_by(acronym=dept_name).first()
if dept is None:
raise ValueError(f"Département inexistant: {dept_name}")
etuds_id: list[int] = [etud.id for etud in dept.etudiants]
absences_query = absences_query.filter(Absence.etudid.in_(etuds_id))
absences: Absence = absences_query.order_by(
Absence.etudid, Absence.jour, not_(Absence.matin)
)
absences_len: int = absences.count()
if absences_len == 0:
print(
f"{TerminalColor.BLUE}Le département {dept_name} ne possède aucune absence.{TerminalColor.RESET}"
)
return
_glob.DEPT_ETUDIDS = {e.id for e in Identite.query.filter_by(dept_id=dept.id)}
_glob.COMPTE = [0, 0]
_glob.ERR_ETU = []
_glob.MERGER_ASSI = None
_glob.MERGER_JUST = None
print(
f"{TerminalColor.BLUE}{absences_len} absences du département {dept_name} vont être migrées{TerminalColor.RESET}"
)
print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True)
etuds_modimpl_ids = {}
for i, abs_ in enumerate(absences):
etud_modimpl_ids = etuds_modimpl_ids.get(abs_.etudid)
if etud_modimpl_ids is None:
etud_modimpl_ids = {
ins.moduleimpl_id
for ins in ModuleImplInscription.query.filter_by(etudid=abs_.etudid)
}
etuds_modimpl_ids[abs_.etudid] = etud_modimpl_ids
try:
_from_abs_to_assiduite_justificatif(abs_, etud_modimpl_ids)
except ValueError as e:
stats.add_problem(abs_, e.args[0])
if i % 10 == 0:
print_progress_bar(
i,
absences_len,
"Progression",
"effectué",
autosize=True,
)
if i % 1000 == 0:
print_progress_bar(
i,
absences_len,
"Progression",
"effectué",
autosize=True,
)
_glob.cnx.commit()
if _glob.MERGER_ASSI is not None:
_glob.MERGER_ASSI.export()
if _glob.MERGER_JUST is not None:
_glob.MERGER_JUST.export()
_glob.cnx.commit()
print_progress_bar(
absences_len,
absences_len,
"Progression",
"effectué",
autosize=True,
)
print(
TerminalColor.RED
+ f"Justification des absences du département {dept_name}, veuillez patienter, ceci peut prendre un certain temps."
+ TerminalColor.RESET
)
justifs: Justificatif = Justificatif.query.join(Identite).filter_by(dept_id=dept.id)
compute_assiduites_justified(justifs, reset=True)
time_elapsed.stop()
statistiques: dict = stats.compute_stats()
print(
f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}"
)
filename = f"/opt/scodoc-data/log/{datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}scodoc_migration_abs_{dept_name}.json"
if statistiques["total"] > 0:
print(
f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées."
)
print(
f"Vous retrouverez un fichier json {TerminalColor.GREEN}{filename}{TerminalColor.RED} contenant les problèmes de migrations"
)
with open(
filename,
"w",
encoding="utf-8",
) as file:
stats.export(file)
print(
f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés pour le département {dept_name}.{TerminalColor.RESET}"
)
if _glob.DEBUG:
print(dumps(statistiques, indent=2))
def _from_abs_to_assiduite_justificatif(_abs: Absence, etud_modimpl_ids: set[int]):
if _abs.etudid not in _glob.DEPT_ETUDIDS:
raise ValueError("Etudiant inexistant")
if _abs.estabs:
if (_abs.moduleimpl_id is not None) and (
_abs.moduleimpl_id not in etud_modimpl_ids
):
raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit")
if _glob.MERGER_ASSI is None:
_glob.MERGER_ASSI = _Merger(_abs, True)
return True
elif _glob.MERGER_ASSI.merge(_abs):
return True
else:
_glob.MERGER_ASSI.export()
_glob.MERGER_ASSI = _Merger(_abs, True)
return False
if _glob.MERGER_JUST is None:
_glob.MERGER_JUST = _Merger(_abs, False)
return True
elif _glob.MERGER_JUST.merge(_abs):
return True
else:
_glob.MERGER_JUST.export()
_glob.MERGER_JUST = _Merger(_abs, False)
return False