452 lines
14 KiB
Python
452 lines
14 KiB
Python
"""
|
|
Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
|
|
|
|
Ecrit par Matthias HARTMANN
|
|
"""
|
|
from datetime import date, datetime, time, timedelta
|
|
from json import dump, dumps
|
|
from sqlalchemy import not_
|
|
|
|
from flask import g
|
|
|
|
from app import db
|
|
from app.models import (
|
|
Absence,
|
|
Assiduite,
|
|
Departement,
|
|
Identite,
|
|
Justificatif,
|
|
ModuleImplInscription,
|
|
)
|
|
|
|
from app.models.config import ScoDocSiteConfig
|
|
|
|
from app.models.assiduites import (
|
|
compute_assiduites_justified,
|
|
)
|
|
from app.profiler import Profiler
|
|
from app.scodoc.sco_utils import (
|
|
EtatAssiduite,
|
|
EtatJustificatif,
|
|
TerminalColor,
|
|
localize_datetime,
|
|
print_progress_bar,
|
|
)
|
|
from app.scodoc import notesdb as ndb
|
|
|
|
|
|
class _glob:
|
|
"""variables globales du script"""
|
|
|
|
DEBUG: bool = False
|
|
PROBLEMS: dict[int, list[str]] = {}
|
|
DEPT_ETUDIDS: dict[int, Identite] = {}
|
|
COMPTE: list[int, int] = []
|
|
ERR_ETU: list[int] = []
|
|
MERGER_ASSI: "_Merger" = None
|
|
MERGER_JUST: "_Merger" = None
|
|
|
|
JUSTIFS: dict[int, list[tuple[datetime, datetime]]] = {}
|
|
|
|
MORNING: time = None
|
|
NOON: time = None
|
|
EVENING: time = None
|
|
|
|
|
|
class _Merger:
|
|
def __init__(self, abs_: Absence, est_abs: bool) -> None:
|
|
self.deb = (abs_.jour, abs_.matin)
|
|
self.fin = (abs_.jour, abs_.matin)
|
|
self.moduleimpl = abs_.moduleimpl_id
|
|
self.etudid = abs_.etudid
|
|
self.est_abs = est_abs
|
|
self.raison = abs_.description
|
|
self.entry_date = abs_.entry_date
|
|
self.est_just = abs_.estjust
|
|
|
|
def merge(self, abs_: Absence) -> bool:
|
|
"""Fusionne les absences.
|
|
Return False si pas de fusion.
|
|
"""
|
|
|
|
if self.etudid != abs_.etudid:
|
|
return False
|
|
|
|
# Cas d'une même absence enregistrée plusieurs fois
|
|
if self.fin == (abs_.jour, abs_.matin):
|
|
self.moduleimpl = None
|
|
else:
|
|
if self.fin[1]:
|
|
if abs_.jour != self.fin[0]:
|
|
return False
|
|
else:
|
|
day_after: date = abs_.jour - timedelta(days=1) == self.fin[0]
|
|
if not (day_after and abs_.matin and self.est_just == abs_.estjust):
|
|
return False
|
|
|
|
self.est_just = self.est_just or abs_.estjust
|
|
|
|
self.fin = (abs_.jour, abs_.matin)
|
|
|
|
return True
|
|
|
|
@staticmethod
|
|
def _tuple_to_date(couple: tuple[date, bool], end=False):
|
|
if couple[1]:
|
|
time_ = _glob.NOON if end else _glob.MORNING
|
|
date_ = datetime.combine(couple[0], time_)
|
|
else:
|
|
time_ = _glob.EVENING if end else _glob.NOON
|
|
date_ = datetime.combine(couple[0], time_)
|
|
d = localize_datetime(date_)
|
|
return d
|
|
|
|
def _to_justif(self):
|
|
date_deb = _Merger._tuple_to_date(self.deb)
|
|
date_fin = _Merger._tuple_to_date(self.fin, end=True)
|
|
|
|
_glob.JUSTIFS[self.etudid].append((date_deb, date_fin))
|
|
|
|
_glob.cursor.execute(
|
|
"""INSERT INTO justificatifs
|
|
(etudid,date_debut,date_fin,etat,raison,entry_date)
|
|
VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(raison)s,%(entry_date)s)
|
|
""",
|
|
{
|
|
"etudid": self.etudid,
|
|
"date_debut": date_deb,
|
|
"date_fin": date_fin,
|
|
"etat": EtatJustificatif.VALIDE,
|
|
"raison": self.raison,
|
|
"entry_date": self.entry_date,
|
|
},
|
|
)
|
|
|
|
def _to_assi(self):
|
|
date_deb = _Merger._tuple_to_date(self.deb)
|
|
date_fin = _Merger._tuple_to_date(self.fin, end=True)
|
|
|
|
self.est_just = (
|
|
_assi_in_justifs(date_deb, date_fin, self.etudid) or self.est_just
|
|
)
|
|
if _glob.MERGER_JUST is not None and not self.est_just:
|
|
justi_date_deb = _Merger._tuple_to_date(_glob.MERGER_JUST.deb)
|
|
justi_date_fin = _Merger._tuple_to_date(_glob.MERGER_JUST.fin, end=True)
|
|
justifiee = date_deb >= justi_date_deb and date_fin <= justi_date_fin
|
|
self.est_just = justifiee
|
|
|
|
_glob.cursor.execute(
|
|
"""INSERT INTO assiduites
|
|
(etudid,date_debut,date_fin,etat,moduleimpl_id,description,entry_date,est_just)
|
|
VALUES (%(etudid)s,%(date_debut)s,%(date_fin)s,%(etat)s,%(moduleimpl_id)s,%(description)s,%(entry_date)s, %(est_just)s)
|
|
""",
|
|
{
|
|
"etudid": self.etudid,
|
|
"date_debut": date_deb,
|
|
"date_fin": date_fin,
|
|
"etat": EtatAssiduite.ABSENT,
|
|
"moduleimpl_id": self.moduleimpl,
|
|
"description": self.raison,
|
|
"entry_date": self.entry_date,
|
|
"est_just": self.est_just,
|
|
},
|
|
)
|
|
|
|
def export(self):
|
|
"""Génère un nouvel objet Assiduité ou Justificatif"""
|
|
obj: Assiduite or Justificatif = None
|
|
if self.est_abs:
|
|
_glob.COMPTE[0] += 1
|
|
self._to_assi()
|
|
else:
|
|
_glob.COMPTE[1] += 1
|
|
self._to_justif()
|
|
|
|
|
|
def _assi_in_justifs(deb, fin, etudid):
|
|
return any(deb >= j[0] and fin <= j[1] for j in _glob.JUSTIFS[etudid])
|
|
|
|
|
|
class _Statistics:
|
|
def __init__(self) -> None:
|
|
self.object: dict[str, dict or int] = {"total": 0}
|
|
self.year: int = None
|
|
|
|
def __set_year(self, year: int):
|
|
if year not in self.object:
|
|
self.object[year] = {
|
|
"etuds_inexistant": [],
|
|
"abs_invalide": {},
|
|
}
|
|
self.year = year
|
|
return self
|
|
|
|
def __add_etud(self, etudid: int):
|
|
if etudid not in self.object[self.year]["etuds_inexistant"]:
|
|
self.object[self.year]["etuds_inexistant"].append(etudid)
|
|
return self
|
|
|
|
def __add_abs(self, abs_: int, err: str):
|
|
if abs_ not in self.object[self.year]["abs_invalide"]:
|
|
self.object[self.year]["abs_invalide"][abs_] = [err]
|
|
else:
|
|
self.object[self.year]["abs_invalide"][abs_].append(err)
|
|
|
|
return self
|
|
|
|
def add_problem(self, abs_: Absence, err: str):
|
|
"""Ajoute un nouveau problème dans les statistiques"""
|
|
abs_.jour: date
|
|
pivot: date = date(abs_.jour.year, 9, 15)
|
|
year: int = abs_.jour.year
|
|
if pivot < abs_.jour:
|
|
year += 1
|
|
self.__set_year(year)
|
|
|
|
if err == "Etudiant inexistant":
|
|
self.__add_etud(abs_.etudid)
|
|
else:
|
|
self.__add_abs(abs_.id, err)
|
|
|
|
self.object["total"] += 1
|
|
|
|
def compute_stats(self) -> dict:
|
|
"""Comptage des statistiques"""
|
|
stats: dict = {"total": self.object["total"]}
|
|
for year, item in self.object.items():
|
|
if year == "total":
|
|
continue
|
|
|
|
stats[year] = {}
|
|
stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"])
|
|
stats[year]["abs_invalide"] = len(item["abs_invalide"])
|
|
|
|
return stats
|
|
|
|
def export(self, file):
|
|
"""Sérialise les statistiques dans un fichier"""
|
|
dump(self.object, file, indent=2)
|
|
|
|
|
|
def migrate_abs_to_assiduites(
|
|
dept: str = None,
|
|
morning: str = None,
|
|
noon: str = None,
|
|
evening: str = None,
|
|
debug: bool = False,
|
|
):
|
|
"""
|
|
une absence à 3 états:
|
|
|
|
|.estabs|.estjust|
|
|
|1|0| -> absence non justifiée
|
|
|1|1| -> absence justifiée
|
|
|0|1| -> justifié
|
|
|
|
dualité des temps :
|
|
|
|
.matin: bool (0:00 -> time_pref | time_pref->23:59:59)
|
|
.jour : date (jour de l'absence/justificatif)
|
|
.moduleimpl_id: relation -> moduleimpl_id
|
|
description:str -> motif abs / raison justif
|
|
|
|
.entry_date: datetime -> timestamp d'entrée de l'abs
|
|
.etudid: relation -> Identite
|
|
"""
|
|
Profiler.clear()
|
|
|
|
_glob.DEBUG = debug
|
|
|
|
if morning is None:
|
|
morning = ScoDocSiteConfig.get("assi_morning_time", time(8, 0))
|
|
|
|
morning: list[str] = str(morning).split(":")
|
|
_glob.MORNING = time(int(morning[0]), int(morning[1]))
|
|
|
|
if noon is None:
|
|
noon = ScoDocSiteConfig.get("assi_lunch_time", time(13, 0))
|
|
|
|
noon: list[str] = str(noon).split(":")
|
|
_glob.NOON = time(int(noon[0]), int(noon[1]))
|
|
|
|
if evening is None:
|
|
evening = ScoDocSiteConfig.get("assi_afternoon_time", time(18, 0))
|
|
|
|
evening: list[str] = str(evening).split(":")
|
|
_glob.EVENING = time(int(evening[0]), int(evening[1]))
|
|
|
|
ndb.open_db_connection()
|
|
_glob.cnx = g.db_conn
|
|
_glob.cursor = _glob.cnx.cursor()
|
|
|
|
if dept is None:
|
|
prof_total = Profiler("MigrationTotal")
|
|
prof_total.start()
|
|
depart: Departement
|
|
for depart in Departement.query.order_by(Departement.id):
|
|
migrate_dept(
|
|
depart.acronym, _Statistics(), Profiler(f"Migration_{depart.acronym}")
|
|
)
|
|
prof_total.stop()
|
|
|
|
print(
|
|
TerminalColor.GREEN
|
|
+ f"Fin de la migration, elle a durée {prof_total.elapsed():.2f}"
|
|
+ TerminalColor.RESET
|
|
)
|
|
|
|
else:
|
|
migrate_dept(dept, _Statistics(), Profiler("Migration"))
|
|
|
|
|
|
def migrate_dept(dept_name: str, stats: _Statistics, time_elapsed: Profiler):
|
|
time_elapsed.start()
|
|
|
|
absences_query = Absence.query
|
|
dept: Departement = Departement.query.filter_by(acronym=dept_name).first()
|
|
|
|
if dept is None:
|
|
raise ValueError(f"Département inexistant: {dept_name}")
|
|
|
|
etuds_id: list[int] = [etud.id for etud in dept.etudiants]
|
|
for etudid in etuds_id:
|
|
_glob.JUSTIFS[etudid] = []
|
|
absences_query = absences_query.filter(Absence.etudid.in_(etuds_id))
|
|
absences: Absence = absences_query.order_by(
|
|
Absence.etudid, Absence.jour, not_(Absence.matin)
|
|
)
|
|
|
|
absences_len: int = absences.count()
|
|
|
|
if absences_len == 0:
|
|
print(
|
|
f"{TerminalColor.BLUE}Le département {dept_name} ne possède aucune absence.{TerminalColor.RESET}"
|
|
)
|
|
return
|
|
|
|
_glob.DEPT_ETUDIDS = {e.id for e in Identite.query.filter_by(dept_id=dept.id)}
|
|
_glob.COMPTE = [0, 0]
|
|
_glob.ERR_ETU = []
|
|
_glob.MERGER_ASSI = None
|
|
_glob.MERGER_JUST = None
|
|
|
|
print(
|
|
f"{TerminalColor.BLUE}{absences_len} absences du département {dept_name} vont être migrées{TerminalColor.RESET}"
|
|
)
|
|
|
|
print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True)
|
|
|
|
etuds_modimpl_ids = {}
|
|
for i, abs_ in enumerate(absences):
|
|
etud_modimpl_ids = etuds_modimpl_ids.get(abs_.etudid)
|
|
if etud_modimpl_ids is None:
|
|
etud_modimpl_ids = {
|
|
ins.moduleimpl_id
|
|
for ins in ModuleImplInscription.query.filter_by(etudid=abs_.etudid)
|
|
}
|
|
etuds_modimpl_ids[abs_.etudid] = etud_modimpl_ids
|
|
try:
|
|
_from_abs_to_assiduite_justificatif(abs_, etud_modimpl_ids)
|
|
except ValueError as e:
|
|
stats.add_problem(abs_, e.args[0])
|
|
|
|
if i % 10 == 0:
|
|
print_progress_bar(
|
|
i,
|
|
absences_len,
|
|
"Progression",
|
|
"effectué",
|
|
autosize=True,
|
|
)
|
|
|
|
if i % 1000 == 0:
|
|
print_progress_bar(
|
|
i,
|
|
absences_len,
|
|
"Progression",
|
|
"effectué",
|
|
autosize=True,
|
|
)
|
|
_glob.cnx.commit()
|
|
|
|
if _glob.MERGER_ASSI is not None:
|
|
_glob.MERGER_ASSI.export()
|
|
if _glob.MERGER_JUST is not None:
|
|
_glob.MERGER_JUST.export()
|
|
|
|
_glob.cnx.commit()
|
|
|
|
print_progress_bar(
|
|
absences_len,
|
|
absences_len,
|
|
"Progression",
|
|
"effectué",
|
|
autosize=True,
|
|
)
|
|
|
|
# print(
|
|
# TerminalColor.RED
|
|
# + f"Justification des absences du département {dept_name}, veuillez patienter, ceci peut prendre un certain temps."
|
|
# + TerminalColor.RESET
|
|
# )
|
|
|
|
# justifs: Justificatif = Justificatif.query.join(Identite).filter_by(dept_id=dept.id)
|
|
# compute_assiduites_justified(justifs, reset=True)
|
|
|
|
time_elapsed.stop()
|
|
|
|
statistiques: dict = stats.compute_stats()
|
|
print(
|
|
f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}"
|
|
)
|
|
|
|
filename = f"/opt/scodoc-data/log/{datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}scodoc_migration_abs_{dept_name}.json"
|
|
if statistiques["total"] > 0:
|
|
print(
|
|
f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées."
|
|
)
|
|
print(
|
|
f"Vous retrouverez un fichier json {TerminalColor.GREEN}{filename}{TerminalColor.RED} contenant les problèmes de migrations"
|
|
)
|
|
|
|
with open(
|
|
filename,
|
|
"w",
|
|
encoding="utf-8",
|
|
) as file:
|
|
stats.export(file)
|
|
|
|
print(
|
|
f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés pour le département {dept_name}.{TerminalColor.RESET}"
|
|
)
|
|
|
|
if _glob.DEBUG:
|
|
print(dumps(statistiques, indent=2))
|
|
|
|
|
|
def _from_abs_to_assiduite_justificatif(_abs: Absence, etud_modimpl_ids: set[int]):
|
|
if _abs.etudid not in _glob.DEPT_ETUDIDS:
|
|
raise ValueError("Etudiant inexistant")
|
|
|
|
if _abs.estabs:
|
|
if (_abs.moduleimpl_id is not None) and (
|
|
_abs.moduleimpl_id not in etud_modimpl_ids
|
|
):
|
|
raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit")
|
|
|
|
if _glob.MERGER_ASSI is None:
|
|
_glob.MERGER_ASSI = _Merger(_abs, True)
|
|
elif _glob.MERGER_ASSI.merge(_abs):
|
|
pass
|
|
else:
|
|
_glob.MERGER_ASSI.export()
|
|
_glob.MERGER_ASSI = _Merger(_abs, True)
|
|
if _abs.estjust:
|
|
if _glob.MERGER_JUST is None:
|
|
_glob.MERGER_JUST = _Merger(_abs, False)
|
|
elif _glob.MERGER_JUST.merge(_abs):
|
|
pass
|
|
else:
|
|
_glob.MERGER_JUST.export()
|
|
_glob.MERGER_JUST = _Merger(_abs, False)
|