422 lines
12 KiB
Python
422 lines
12 KiB
Python
"""
|
|
Script de migration des données de la base "absences" -> "assiduites"/"justificatifs"
|
|
|
|
Ecrit par Matthias HARTMANN
|
|
"""
|
|
from datetime import date, datetime, time, timedelta
|
|
from json import dump, dumps
|
|
from sqlalchemy import not_
|
|
|
|
from app import db
|
|
from app.models import (
|
|
Absence,
|
|
Assiduite,
|
|
Departement,
|
|
Identite,
|
|
Justificatif,
|
|
ModuleImplInscription,
|
|
)
|
|
from app.models.assiduites import (
|
|
compute_assiduites_justified,
|
|
)
|
|
from app.profiler import Profiler
|
|
from app.scodoc.sco_utils import (
|
|
EtatAssiduite,
|
|
EtatJustificatif,
|
|
TerminalColor,
|
|
localize_datetime,
|
|
print_progress_bar,
|
|
)
|
|
|
|
|
|
class _Merger:
|
|
"""pour typage"""
|
|
|
|
|
|
class _glob:
|
|
"""variables globales du script"""
|
|
|
|
DEBUG: bool = False
|
|
PROBLEMS: dict[int, list[str]] = {}
|
|
CURRENT_ETU: list = []
|
|
MODULES: list[tuple[int, int]] = []
|
|
COMPTE: list[int, int] = []
|
|
ERR_ETU: list[int] = []
|
|
MERGER_ASSI: _Merger = None
|
|
MERGER_JUST: _Merger = None
|
|
|
|
MORNING: time = None
|
|
NOON: time = None
|
|
EVENING: time = None
|
|
|
|
|
|
class _Merger:
|
|
def __init__(self, abs_: Absence, est_abs: bool) -> None:
|
|
self.deb = (abs_.jour, abs_.matin)
|
|
self.fin = (abs_.jour, abs_.matin)
|
|
self.moduleimpl = abs_.moduleimpl_id
|
|
self.etudid = abs_.etudid
|
|
self.est_abs = est_abs
|
|
self.raison = abs_.description
|
|
self.entry_date = abs_.entry_date
|
|
|
|
def merge(self, abs_: Absence) -> bool:
|
|
"""Fusionne les absences"""
|
|
|
|
if self.etudid != abs_.etudid:
|
|
return False
|
|
|
|
# Cas d'une même absence enregistrée plusieurs fois
|
|
if self.fin == (abs_.jour, abs_.matin):
|
|
self.moduleimpl = None
|
|
else:
|
|
if self.fin[1]:
|
|
if abs_.jour != self.fin[0]:
|
|
return False
|
|
else:
|
|
day_after: date = abs_.jour - timedelta(days=1) == self.fin[0]
|
|
if not (day_after and abs_.matin):
|
|
return False
|
|
|
|
self.fin = (abs_.jour, abs_.matin)
|
|
return True
|
|
|
|
@staticmethod
|
|
def _tuple_to_date(couple: tuple[date, bool], end=False):
|
|
if couple[1]:
|
|
time_ = _glob.NOON if end else _glob.MORNING
|
|
date_ = datetime.combine(couple[0], time_)
|
|
else:
|
|
time_ = _glob.EVENING if end else _glob.NOON
|
|
date_ = datetime.combine(couple[0], time_)
|
|
d = localize_datetime(date_)
|
|
return d
|
|
|
|
def _to_justif(self):
|
|
date_deb = _Merger._tuple_to_date(self.deb)
|
|
date_fin = _Merger._tuple_to_date(self.fin, end=True)
|
|
|
|
retour = Justificatif.fast_create_justificatif(
|
|
etudid=self.etudid,
|
|
date_debut=date_deb,
|
|
date_fin=date_fin,
|
|
etat=EtatJustificatif.VALIDE,
|
|
raison=self.raison,
|
|
entry_date=self.entry_date,
|
|
)
|
|
return retour
|
|
|
|
def _to_assi(self):
|
|
date_deb = _Merger._tuple_to_date(self.deb)
|
|
date_fin = _Merger._tuple_to_date(self.fin, end=True)
|
|
|
|
retour = Assiduite.fast_create_assiduite(
|
|
etudid=self.etudid,
|
|
date_debut=date_deb,
|
|
date_fin=date_fin,
|
|
etat=EtatAssiduite.ABSENT,
|
|
moduleimpl_id=self.moduleimpl,
|
|
description=self.raison,
|
|
entry_date=self.entry_date,
|
|
)
|
|
return retour
|
|
|
|
def export(self):
|
|
"""Génère un nouvel objet Assiduité ou Justificatif"""
|
|
obj: Assiduite or Justificatif = None
|
|
if self.est_abs:
|
|
_glob.COMPTE[0] += 1
|
|
obj = self._to_assi()
|
|
else:
|
|
_glob.COMPTE[1] += 1
|
|
obj = self._to_justif()
|
|
|
|
db.session.add(obj)
|
|
|
|
|
|
class _Statistics:
|
|
def __init__(self) -> None:
|
|
self.object: dict[str, dict or int] = {"total": 0}
|
|
self.year: int = None
|
|
|
|
def __set_year(self, year: int):
|
|
if year not in self.object:
|
|
self.object[year] = {
|
|
"etuds_inexistant": [],
|
|
"abs_invalide": {},
|
|
}
|
|
self.year = year
|
|
return self
|
|
|
|
def __add_etud(self, etudid: int):
|
|
if etudid not in self.object[self.year]["etuds_inexistant"]:
|
|
self.object[self.year]["etuds_inexistant"].append(etudid)
|
|
return self
|
|
|
|
def __add_abs(self, abs_: int, err: str):
|
|
if abs_ not in self.object[self.year]["abs_invalide"]:
|
|
self.object[self.year]["abs_invalide"][abs_] = [err]
|
|
else:
|
|
self.object[self.year]["abs_invalide"][abs_].append(err)
|
|
|
|
return self
|
|
|
|
def add_problem(self, abs_: Absence, err: str):
|
|
"""Ajoute un nouveau problème dans les statistiques"""
|
|
abs_.jour: date
|
|
pivot: date = date(abs_.jour.year, 9, 15)
|
|
year: int = abs_.jour.year
|
|
if pivot < abs_.jour:
|
|
year += 1
|
|
self.__set_year(year)
|
|
|
|
if err == "Etudiant inexistant":
|
|
self.__add_etud(abs_.etudid)
|
|
else:
|
|
self.__add_abs(abs_.id, err)
|
|
|
|
self.object["total"] += 1
|
|
|
|
def compute_stats(self) -> dict:
|
|
"""Comptage des statistiques"""
|
|
stats: dict = {"total": self.object["total"]}
|
|
for year, item in self.object.items():
|
|
|
|
if year == "total":
|
|
continue
|
|
|
|
stats[year] = {}
|
|
stats[year]["etuds_inexistant"] = len(item["etuds_inexistant"])
|
|
stats[year]["abs_invalide"] = len(item["abs_invalide"])
|
|
|
|
return stats
|
|
|
|
def export(self, file):
|
|
"""Sérialise les statistiques dans un fichier"""
|
|
dump(self.object, file, indent=2)
|
|
|
|
|
|
def migrate_abs_to_assiduites(
|
|
dept: str = None,
|
|
morning: str = None,
|
|
noon: str = None,
|
|
evening: str = None,
|
|
debug: bool = False,
|
|
):
|
|
"""
|
|
une absence à 3 états:
|
|
|
|
|.estabs|.estjust|
|
|
|1|0| -> absence non justifiée
|
|
|1|1| -> absence justifiée
|
|
|0|1| -> justifié
|
|
|
|
dualité des temps :
|
|
|
|
.matin: bool (0:00 -> time_pref | time_pref->23:59:59)
|
|
.jour : date (jour de l'absence/justificatif)
|
|
.moduleimpl_id: relation -> moduleimpl_id
|
|
description:str -> motif abs / raision justif
|
|
|
|
.entry_date: datetime -> timestamp d'entrée de l'abs
|
|
.etudid: relation -> Identite
|
|
"""
|
|
Profiler.clear()
|
|
|
|
_glob.DEBUG = debug
|
|
|
|
if morning is None:
|
|
_glob.MORNING = time(8, 0)
|
|
else:
|
|
morning: list[str] = morning.split("h")
|
|
_glob.MORNING = time(int(morning[0]), int(morning[1]))
|
|
|
|
if noon is None:
|
|
_glob.NOON = time(12, 0)
|
|
else:
|
|
noon: list[str] = noon.split("h")
|
|
_glob.NOON = time(int(noon[0]), int(noon[1]))
|
|
|
|
if evening is None:
|
|
_glob.EVENING = time(18, 0)
|
|
else:
|
|
evening: list[str] = evening.split("h")
|
|
_glob.EVENING = time(int(evening[0]), int(evening[1]))
|
|
|
|
if dept is None:
|
|
prof_total = Profiler("MigrationTotal")
|
|
prof_total.start()
|
|
depart: Departement
|
|
for depart in Departement.query.order_by(Departement.id):
|
|
migrate_dept(
|
|
depart.acronym, _Statistics(), Profiler(f"Migration_{depart.acronym}")
|
|
)
|
|
prof_total.stop()
|
|
|
|
print(
|
|
TerminalColor.GREEN
|
|
+ f"Fin de la migration, elle a durée {prof_total.elapsed():.2f}"
|
|
+ TerminalColor.RESET
|
|
)
|
|
|
|
else:
|
|
migrate_dept(dept, _Statistics(), Profiler("Migration"))
|
|
|
|
|
|
def migrate_dept(dept_name: str, stats: _Statistics, time_elapsed: Profiler):
|
|
time_elapsed.start()
|
|
|
|
absences_query = Absence.query
|
|
dept: Departement = Departement.query.filter_by(acronym=dept_name).first()
|
|
|
|
if dept is None:
|
|
return
|
|
|
|
etuds_id: list[int] = [etud.id for etud in dept.etudiants]
|
|
absences_query = absences_query.filter(Absence.etudid.in_(etuds_id))
|
|
absences: Absence = absences_query.order_by(
|
|
Absence.etudid, Absence.jour, not_(Absence.matin)
|
|
)
|
|
|
|
absences_len: int = absences.count()
|
|
|
|
if absences_len == 0:
|
|
print(
|
|
f"{TerminalColor.BLUE}Le département {dept_name} ne possède aucune absence.{TerminalColor.RESET}"
|
|
)
|
|
return
|
|
|
|
_glob.CURRENT_ETU = []
|
|
_glob.MODULES = []
|
|
_glob.COMPTE = [0, 0]
|
|
_glob.ERR_ETU = []
|
|
_glob.MERGER_ASSI = None
|
|
_glob.MERGER_JUST = None
|
|
|
|
print(
|
|
f"{TerminalColor.BLUE}{absences_len} absences du département {dept_name} vont être migrées{TerminalColor.RESET}"
|
|
)
|
|
|
|
print_progress_bar(0, absences_len, "Progression", "effectué", autosize=True)
|
|
|
|
for i, abs_ in enumerate(absences):
|
|
try:
|
|
_from_abs_to_assiduite_justificatif(abs_)
|
|
except ValueError as e:
|
|
stats.add_problem(abs_, e.args[0])
|
|
|
|
if i % 10 == 0:
|
|
print_progress_bar(
|
|
i,
|
|
absences_len,
|
|
"Progression",
|
|
"effectué",
|
|
autosize=True,
|
|
)
|
|
|
|
if i % 1000 == 0:
|
|
print_progress_bar(
|
|
i,
|
|
absences_len,
|
|
"Progression",
|
|
"effectué",
|
|
autosize=True,
|
|
)
|
|
db.session.commit()
|
|
|
|
_glob.MERGER_ASSI.export()
|
|
_glob.MERGER_JUST.export()
|
|
|
|
db.session.commit()
|
|
|
|
justifs: Justificatif = Justificatif.query
|
|
|
|
if dept_name is not None:
|
|
justifs.filter(Justificatif.etudid.in_(etuds_id))
|
|
|
|
print_progress_bar(
|
|
absences_len,
|
|
absences_len,
|
|
"Progression",
|
|
"effectué",
|
|
autosize=True,
|
|
)
|
|
|
|
print(
|
|
TerminalColor.RED
|
|
+ f"Justification des absences du département {dept_name}, veuillez patienter, ceci peut prendre un certain temps."
|
|
+ TerminalColor.RESET
|
|
)
|
|
|
|
compute_assiduites_justified(justifs, reset=True)
|
|
|
|
time_elapsed.stop()
|
|
|
|
statistiques: dict = stats.compute_stats()
|
|
print(
|
|
f"{TerminalColor.GREEN}La migration a pris {time_elapsed.elapsed():.2f} secondes {TerminalColor.RESET}"
|
|
)
|
|
|
|
print(
|
|
f"{TerminalColor.RED}{statistiques['total']} absences qui n'ont pas pu être migrées."
|
|
)
|
|
print(
|
|
f"Vous retrouverez un fichier json {TerminalColor.GREEN}/opt/scodoc-data/log/scodoc_migration_abs_{dept_name}.json{TerminalColor.RED} contenant les problèmes de migrations"
|
|
)
|
|
with open(
|
|
f"/opt/scodoc-data/log/scodoc_migration_abs_{dept_name}.json",
|
|
"w",
|
|
encoding="utf-8",
|
|
) as file:
|
|
stats.export(file)
|
|
|
|
print(
|
|
f"{TerminalColor.CYAN}{_glob.COMPTE[0]} assiduités et {_glob.COMPTE[1]} justificatifs ont été générés pour le département {dept_name}.{TerminalColor.RESET}"
|
|
)
|
|
|
|
if _glob.DEBUG:
|
|
print(dumps(statistiques, indent=2))
|
|
|
|
|
|
def _from_abs_to_assiduite_justificatif(_abs: Absence):
|
|
|
|
if _abs.etudid not in _glob.CURRENT_ETU:
|
|
etud: Identite = Identite.query.filter_by(id=_abs.etudid).first()
|
|
if etud is None:
|
|
raise ValueError("Etudiant inexistant")
|
|
_glob.CURRENT_ETU.append(_abs.etudid)
|
|
|
|
if _abs.estabs:
|
|
moduleimpl_id: int = _abs.moduleimpl_id
|
|
if (
|
|
moduleimpl_id is not None
|
|
and (_abs.etudid, _abs.moduleimpl_id) not in _glob.MODULES
|
|
):
|
|
moduleimpl_inscription: ModuleImplInscription = (
|
|
ModuleImplInscription.query.filter_by(
|
|
moduleimpl_id=_abs.moduleimpl_id, etudid=_abs.etudid
|
|
).first()
|
|
)
|
|
if moduleimpl_inscription is None:
|
|
raise ValueError("Moduleimpl_id incorrect ou étudiant non inscrit")
|
|
|
|
if _glob.MERGER_ASSI is None:
|
|
_glob.MERGER_ASSI = _Merger(_abs, True)
|
|
return True
|
|
elif _glob.MERGER_ASSI.merge(_abs):
|
|
return True
|
|
else:
|
|
_glob.MERGER_ASSI.export()
|
|
_glob.MERGER_ASSI = _Merger(_abs, True)
|
|
return False
|
|
|
|
if _glob.MERGER_JUST is None:
|
|
_glob.MERGER_JUST = _Merger(_abs, False)
|
|
return True
|
|
elif _glob.MERGER_JUST.merge(_abs):
|
|
return True
|
|
else:
|
|
_glob.MERGER_JUST.export()
|
|
_glob.MERGER_JUST = _Merger(_abs, False)
|
|
return False
|