1
0
forked from ScoDoc/ScoDoc
ScoDoc/app/models/assiduites.py

765 lines
27 KiB
Python

# -*- coding: UTF-8 -*
"""Gestion de l'assiduité (assiduités + justificatifs)
"""
from datetime import datetime
from flask_login import current_user
from flask_sqlalchemy.query import Query
from sqlalchemy.exc import DataError
from app import db, log, g, set_sco_dept
from app.models import (
ModuleImpl,
Module,
Scolog,
FormSemestre,
FormSemestreInscription,
ScoDocModel,
)
from app.models.etudiants import Identite
from app.auth.models import User
from app.scodoc import sco_abs_notification
from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
from app.scodoc.sco_exceptions import ScoValueError
from app.scodoc.sco_permissions import Permission
from app.scodoc.sco_utils import (
EtatAssiduite,
EtatJustificatif,
localize_datetime,
is_assiduites_module_forced,
NonWorkDays,
)
class Assiduite(ScoDocModel):
"""
Représente une assiduité:
- une plage horaire lié à un état et un étudiant
- un module si spécifiée
- une description si spécifiée
"""
__tablename__ = "assiduites"
id = db.Column(db.Integer, primary_key=True, nullable=False)
assiduite_id = db.synonym("id")
date_debut = db.Column(
db.DateTime(timezone=True), server_default=db.func.now(), nullable=False
)
date_fin = db.Column(
db.DateTime(timezone=True), server_default=db.func.now(), nullable=False
)
moduleimpl_id = db.Column(
db.Integer,
db.ForeignKey("notes_moduleimpl.id", ondelete="SET NULL"),
)
etudid = db.Column(
db.Integer,
db.ForeignKey("identite.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
etat = db.Column(db.Integer, nullable=False)
description = db.Column(db.Text)
entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now())
user_id = db.Column(
db.Integer,
db.ForeignKey("user.id", ondelete="SET NULL"),
nullable=True,
)
est_just = db.Column(db.Boolean, server_default="false", nullable=False)
external_data = db.Column(db.JSON, nullable=True)
# Déclare la relation "joined" car on va très souvent vouloir récupérer
# l'étudiant en même tant que l'assiduité (perf.: évite nouvelle requete SQL)
etudiant = db.relationship("Identite", back_populates="assiduites", lazy="joined")
# En revanche, user est rarement accédé:
user = db.relationship(
"User",
backref=db.backref(
"assiduites", lazy="select", order_by="Assiduite.entry_date"
),
lazy="select",
)
def to_dict(self, format_api=True) -> dict:
"""Retourne la représentation json de l'assiduité"""
etat = self.etat
user: User | None = None
if format_api:
# format api utilise les noms "present,absent,retard" au lieu des int
etat = EtatAssiduite.inverse().get(self.etat).name
if self.user_id is not None:
user = db.session.get(User, self.user_id)
data = {
"assiduite_id": self.id,
"etudid": self.etudid,
"code_nip": self.etudiant.code_nip,
"moduleimpl_id": self.moduleimpl_id,
"date_debut": self.date_debut,
"date_fin": self.date_fin,
"etat": etat,
"desc": self.description,
"entry_date": self.entry_date,
"user_id": None if user is None else user.id, # l'uid
"user_name": None if user is None else user.user_name, # le login
"user_nom_complet": None
if user is None
else user.get_nomcomplet(), # "Marie Dupont"
"est_just": self.est_just,
"external_data": self.external_data,
}
return data
def __str__(self) -> str:
"chaine pour journaux et debug (lisible par humain français)"
try:
etat_str = EtatAssiduite(self.etat).name.lower().capitalize()
except ValueError:
etat_str = "Invalide"
return f"""{etat_str} {
"just." if self.est_just else "non just."
} de {
self.date_debut.strftime("%d/%m/%Y %Hh%M")
} à {
self.date_fin.strftime("%d/%m/%Y %Hh%M")
}"""
@classmethod
def create_assiduite(
cls,
etud: Identite,
date_debut: datetime,
date_fin: datetime,
etat: EtatAssiduite,
moduleimpl: ModuleImpl = None,
description: str = None,
entry_date: datetime = None,
user_id: int = None,
est_just: bool = False,
external_data: dict = None,
notify_mail=False,
) -> "Assiduite":
"""Créer une nouvelle assiduité pour l'étudiant.
Les datetime doivent être en timezone serveur.
Raises ScoValueError en cas de conflit ou erreur.
"""
if date_debut.tzinfo is None:
log(
f"Warning: create_assiduite: date_debut without timezone ({date_debut})"
)
if date_fin.tzinfo is None:
log(f"Warning: create_assiduite: date_fin without timezone ({date_fin})")
# Vérification jours non travaillés
# -> vérifie si la date de début ou la date de fin est sur un jour non travaillé
# On récupère les formsemestres des dates de début et de fin
formsemestre_date_debut: FormSemestre = get_formsemestre_from_data(
{
"etudid": etud.id,
"date_debut": date_debut,
"date_fin": date_debut,
}
)
formsemestre_date_fin: FormSemestre = get_formsemestre_from_data(
{
"etudid": etud.id,
"date_debut": date_fin,
"date_fin": date_fin,
}
)
if date_debut.weekday() in NonWorkDays.get_all_non_work_days(
formsemestre_id=formsemestre_date_debut
):
raise ScoValueError("La date de début n'est pas un jour travaillé")
if date_fin.weekday() in NonWorkDays.get_all_non_work_days(
formsemestre_id=formsemestre_date_fin
):
raise ScoValueError("La date de fin n'est pas un jour travaillé")
# Vérification de non duplication des périodes
assiduites: Query = etud.assiduites
if is_period_conflicting(date_debut, date_fin, assiduites, Assiduite):
log(
f"""create_assiduite: period_conflicting etudid={etud.id} date_debut={
date_debut} date_fin={date_fin}"""
)
raise ScoValueError(
"Duplication: la période rentre en conflit avec une plage enregistrée"
)
if not est_just:
est_just = (
len(
get_justifs_from_date(etud.etudid, date_debut, date_fin, valid=True)
)
> 0
)
moduleimpl_id = None
if moduleimpl is not None:
# Vérification de l'inscription de l'étudiant
if moduleimpl.est_inscrit(etud):
moduleimpl_id = moduleimpl.id
else:
raise ScoValueError("L'étudiant n'est pas inscrit au module")
elif not (
external_data is not None and external_data.get("module") is not None
):
# Vérification si module forcé
formsemestre: FormSemestre = get_formsemestre_from_data(
{"etudid": etud.id, "date_debut": date_debut, "date_fin": date_fin}
)
force: bool
if formsemestre:
force = is_assiduites_module_forced(formsemestre_id=formsemestre.id)
else:
force = is_assiduites_module_forced(dept_id=etud.dept_id)
if force:
raise ScoValueError("Module non renseigné")
nouv_assiduite = Assiduite(
date_debut=date_debut,
date_fin=date_fin,
description=description,
entry_date=entry_date,
est_just=est_just,
etat=etat,
etudiant=etud,
external_data=external_data,
moduleimpl_id=moduleimpl_id,
user_id=user_id,
)
db.session.add(nouv_assiduite)
db.session.flush()
log(f"create_assiduite: {etud.id} id={nouv_assiduite.id} {nouv_assiduite}")
Scolog.logdb(
method="create_assiduite",
etudid=etud.id,
msg=f"assiduité: {nouv_assiduite}",
)
if notify_mail and etat == EtatAssiduite.ABSENT:
sco_abs_notification.abs_notify(etud.id, nouv_assiduite.date_debut)
return nouv_assiduite
def set_moduleimpl(self, moduleimpl_id: int | str):
"""Mise à jour du moduleimpl_id
Les valeurs du champs "moduleimpl_id" possibles sont :
- <int> (un id classique)
- <str> ("autre" ou "<id>")
- None (pas de moduleimpl_id)
Si la valeur est "autre" il faut:
- mettre à None assiduité.moduleimpl_id
- mettre à jour assiduite.external_data["module"] = "autre"
En fonction de la configuration du semestre la valeur `None` peut-être considérée comme invalide.
- Il faudra donc vérifier que ce n'est pas le cas avant de mettre à jour l'assiduité
"""
moduleimpl: ModuleImpl = None
try:
# ne lève une erreur que si moduleimpl_id est une chaine de caractère non parsable (parseInt)
moduleimpl: ModuleImpl = ModuleImpl.query.get(moduleimpl_id)
# moduleImpl est soit :
# - None si moduleimpl_id==None
# - None si moduleimpl_id==<int> non reconnu
# - ModuleImpl si <int|str> valide
# Vérification ModuleImpl not None (raise ScoValueError)
if moduleimpl is None and self._check_force_module(moduleimpl):
# Ici uniquement si on est autorisé à ne pas avoir de module
self.moduleimpl_id = None
return
# Vérification Inscription ModuleImpl (raise ScoValueError)
if moduleimpl.est_inscrit(self.etudiant):
self.moduleimpl_id = moduleimpl.id
else:
raise ScoValueError("L'étudiant n'est pas inscrit au module")
except DataError:
# On arrive ici si moduleimpl_id == "autre" ou moduleimpl_id == <str> non parsé
if moduleimpl_id != "autre":
raise ScoValueError("Module non reconnu")
# Configuration de external_data pour Module Autre
# Si self.external_data None alors on créé un dictionnaire {"module": "autre"}
# Sinon on met à jour external_data["module"] à "autre"
if self.external_data is None:
self.external_data = {"module": "autre"}
else:
self.external_data["module"] = "autre"
# Dans tous les cas une fois fait, assiduite.moduleimpl_id doit être None
self.moduleimpl_id = None
# Ici pas de vérification du force module car on l'a mis dans "external_data"
def supprime(self):
"Supprime l'assiduité. Log et commit."
from app.scodoc import sco_assiduites as scass
if g.scodoc_dept is None and self.etudiant.dept_id is not None:
# route sans département
set_sco_dept(self.etudiant.departement.acronym)
obj_dict: dict = self.to_dict()
# Suppression de l'objet et LOG
log(f"delete_assidutite: {self.etudiant.id} {self}")
Scolog.logdb(
method="delete_assiduite",
etudid=self.etudiant.id,
msg=f"Assiduité: {self}",
)
db.session.delete(self)
db.session.commit()
# Invalidation du cache
scass.simple_invalidate_cache(obj_dict)
def get_formsemestre(self) -> FormSemestre:
"""Le formsemestre associé.
Attention: en cas d'inscription multiple prend arbitrairement l'un des semestres.
A utiliser avec précaution !
"""
return get_formsemestre_from_data(self.to_dict())
def get_module(self, traduire: bool = False) -> int | str:
"TODO"
if self.moduleimpl_id is not None:
if traduire:
modimpl: ModuleImpl = ModuleImpl.query.get(self.moduleimpl_id)
mod: Module = Module.query.get(modimpl.module_id)
return f"{mod.code} {mod.titre}"
elif self.external_data is not None and "module" in self.external_data:
return (
"Tout module"
if self.external_data["module"] == "Autre"
else self.external_data["module"]
)
return "Non spécifié" if traduire else None
def get_saisie(self) -> str:
"""
retourne le texte "saisie le <date> par <User>"
"""
date: str = self.entry_date.strftime("%d/%m/%Y à %H:%M")
utilisateur: str = ""
if self.user != None:
self.user: User
utilisateur = f"par {self.user.get_prenomnom()}"
return f"saisie le {date} {utilisateur}"
def _check_force_module(self, moduleimpl: ModuleImpl) -> bool:
# Vérification si module forcé
formsemestre: FormSemestre = get_formsemestre_from_data(
{
"etudid": self.etudid,
"date_debut": self.date_debut,
"date_fin": self.date_fin,
}
)
force: bool
if formsemestre:
force = is_assiduites_module_forced(formsemestre_id=formsemestre.id)
else:
force = is_assiduites_module_forced(dept_id=self.etudiant.dept_id)
if force:
raise ScoValueError("Module non renseigné")
return True
class Justificatif(ScoDocModel):
"""
Représente un justificatif:
- une plage horaire lié à un état et un étudiant
- une raison si spécifiée
- un fichier si spécifié
"""
__tablename__ = "justificatifs"
id = db.Column(db.Integer, primary_key=True)
justif_id = db.synonym("id")
date_debut = db.Column(
db.DateTime(timezone=True), server_default=db.func.now(), nullable=False
)
date_fin = db.Column(
db.DateTime(timezone=True), server_default=db.func.now(), nullable=False
)
etudid = db.Column(
db.Integer,
db.ForeignKey("identite.id", ondelete="CASCADE"),
index=True,
nullable=False,
)
etat = db.Column(
db.Integer,
nullable=False,
)
entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now())
"date de création de l'élément: date de saisie"
# pourrait devenir date de dépot au secrétariat, si différente
user_id = db.Column(
db.Integer,
db.ForeignKey("user.id", ondelete="SET NULL"),
nullable=True,
index=True,
)
raison = db.Column(db.Text())
# Archive_id -> sco_archives_justificatifs.py
fichier = db.Column(db.Text())
# Déclare la relation "joined" car on va très souvent vouloir récupérer
# l'étudiant en même tant que le justificatif (perf.: évite nouvelle requete SQL)
etudiant = db.relationship(
"Identite", back_populates="justificatifs", lazy="joined"
)
# En revanche, user est rarement accédé:
user = db.relationship(
"User",
backref=db.backref(
"justificatifs", lazy="select", order_by="Justificatif.entry_date"
),
lazy="select",
)
external_data = db.Column(db.JSON, nullable=True)
@classmethod
def get_justificatif(cls, justif_id: int) -> "Justificatif":
"""Justificatif ou 404, cherche uniquement dans le département courant"""
query = Justificatif.query.filter_by(id=justif_id)
if g.scodoc_dept:
query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id)
return query.first_or_404()
def to_dict(self, format_api: bool = False) -> dict:
"""transformation de l'objet en dictionnaire sérialisable"""
etat = self.etat
user: User = self.user if self.user_id is not None else None
if format_api:
etat = EtatJustificatif.inverse().get(self.etat).name
data = {
"justif_id": self.justif_id,
"etudid": self.etudid,
"code_nip": self.etudiant.code_nip,
"date_debut": self.date_debut,
"date_fin": self.date_fin,
"etat": etat,
"raison": self.raison,
"fichier": self.fichier,
"entry_date": self.entry_date,
"user_id": None if user is None else user.id, # l'uid
"user_name": None if user is None else user.user_name, # le login
"user_nom_complet": None if user is None else user.get_nomcomplet(),
"external_data": self.external_data,
}
return data
def __repr__(self) -> str:
"chaine pour journaux et debug (lisible par humain français)"
try:
etat_str = EtatJustificatif(self.etat).name
except ValueError:
etat_str = "Invalide"
return f"""Justificatif id={self.id} {etat_str} de {
self.date_debut.strftime("%d/%m/%Y %Hh%M")
} à {
self.date_fin.strftime("%d/%m/%Y %Hh%M")
}"""
@classmethod
def convert_dict_fields(cls, args: dict) -> dict:
"""Convert fields. Called by ScoDocModel's create_from_dict, edit and from_dict
Raises ScoValueError si paramètres incorrects.
"""
if not isinstance(args["date_debut"], datetime) or not isinstance(
args["date_fin"], datetime
):
raise ScoValueError("type date incorrect")
if args["date_fin"] <= args["date_debut"]:
raise ScoValueError("dates incompatibles")
if args["entry_date"] and not isinstance(args["entry_date"], datetime):
raise ScoValueError("type entry_date incorrect")
return args
@classmethod
def create_justificatif(
cls,
etudiant: Identite,
date_debut: datetime,
date_fin: datetime,
etat: EtatJustificatif,
raison: str = None,
entry_date: datetime = None,
user_id: int = None,
external_data: dict = None,
) -> "Justificatif":
"""Créer un nouveau justificatif pour l'étudiant.
Raises ScoValueError si paramètres incorrects.
"""
nouv_justificatif = cls.create_from_dict(locals())
db.session.commit()
log(f"create_justificatif: etudid={etudiant.id} {nouv_justificatif}")
Scolog.logdb(
method="create_justificatif",
etudid=etudiant.id,
msg=f"justificatif: {nouv_justificatif}",
)
return nouv_justificatif
def supprime(self):
"Supprime le justificatif. Log et commit."
from app.scodoc import sco_assiduites as scass
from app.scodoc.sco_archives_justificatifs import JustificatifArchiver
# Récupération de l'archive du justificatif
archive_name: str = self.fichier
if archive_name is not None:
# Si elle existe : on essaye de la supprimer
archiver: JustificatifArchiver = JustificatifArchiver()
try:
archiver.delete_justificatif(self.etudiant, archive_name)
except ValueError:
pass
if g.scodoc_dept is None and self.etudiant.dept_id is not None:
# route sans département
set_sco_dept(self.etudiant.departement.acronym)
# On invalide le cache
scass.simple_invalidate_cache(self.to_dict())
# Suppression de l'objet et LOG
log(f"delete_justificatif: {self.etudiant.id} {self}")
Scolog.logdb(
method="delete_justificatif",
etudid=self.etudiant.id,
msg=f"Justificatif: {self}",
)
db.session.delete(self)
db.session.commit()
# On actualise les assiduités justifiées de l'étudiant concerné
compute_assiduites_justified(
self.etudid,
Justificatif.query.filter_by(etudid=self.etudid).all(),
True,
)
def get_fichiers(self) -> tuple[list[str], int]:
"""Renvoie la liste des noms de fichiers justicatifs
accessibles par l'utilisateur courant et le nombre total
de fichiers.
(ces fichiers sont dans l'archive associée)
"""
if self.fichier is None:
return [], 0
archive_name: str = self.fichier
archiver: JustificatifArchiver = JustificatifArchiver()
filenames = archiver.list_justificatifs(archive_name, self.etudiant)
accessible_filenames = []
#
for filename in filenames:
if int(filename[1]) == current_user.id or current_user.has_permission(
Permission.AbsJustifView
):
accessible_filenames.append(filename[0])
return accessible_filenames, len(filenames)
def is_period_conflicting(
date_debut: datetime,
date_fin: datetime,
collection: Query,
collection_cls: Assiduite | Justificatif,
) -> bool:
"""
Vérifie si une date n'entre pas en collision
avec les justificatifs ou assiduites déjà présentes
"""
# On s'assure que les dates soient avec TimeZone
date_debut = localize_datetime(date_debut)
date_fin = localize_datetime(date_fin)
count: int = collection.filter(
collection_cls.date_debut < date_fin, collection_cls.date_fin > date_debut
).count()
return count > 0
def compute_assiduites_justified(
etudid: int, justificatifs: list[Justificatif] = None, reset: bool = False
) -> list[int]:
"""
Args:
etudid (int): l'identifiant de l'étudiant
justificatifs (list[Justificatif]): La liste des justificatifs qui seront utilisés
reset (bool, optional): remet à false les assiduites non justifiés. Defaults to False.
Returns:
list[int]: la liste des assiduités qui ont été justifiées.
"""
# TODO à optimiser (car très long avec 40000 assiduités)
# Si on ne donne pas de justificatifs on prendra par défaut tous les justificatifs de l'étudiant
if justificatifs is None:
justificatifs: list[Justificatif] = Justificatif.query.filter_by(
etudid=etudid
).all()
# On ne prend que les justificatifs valides
justificatifs = [j for j in justificatifs if j.etat == EtatJustificatif.VALIDE]
# On récupère les assiduités de l'étudiant
assiduites: Assiduite = Assiduite.query.filter_by(etudid=etudid)
assiduites_justifiees: list[int] = []
for assi in assiduites:
# On ne justifie pas les Présences
if assi.etat == EtatAssiduite.PRESENT:
continue
# On récupère les justificatifs qui justifient l'assiduité `assi`
assi_justificatifs = Justificatif.query.filter(
Justificatif.etudid == assi.etudid,
Justificatif.date_debut <= assi.date_debut,
Justificatif.date_fin >= assi.date_fin,
Justificatif.etat == EtatJustificatif.VALIDE,
).all()
# Si au moins un justificatif possède une période qui couvre l'assiduité
if any(
assi.date_debut >= j.date_debut and assi.date_fin <= j.date_fin
for j in justificatifs + assi_justificatifs
):
# On justifie l'assiduité
# On ajoute l'id de l'assiduité à la liste des assiduités justifiées
assi.est_just = True
assiduites_justifiees.append(assi.assiduite_id)
db.session.add(assi)
elif reset:
# Si le paramètre reset est Vrai alors les assiduités non justifiées
# sont remise en "non justifiée"
assi.est_just = False
db.session.add(assi)
# On valide la session
db.session.commit()
# On renvoie la liste des assiduite_id des assiduités justifiées
return assiduites_justifiees
def get_assiduites_justif(assiduite_id: int, long: bool) -> list[int | dict]:
"""
get_assiduites_justif Récupération des justificatifs d'une assiduité
Args:
assiduite_id (int): l'identifiant de l'assiduité
long (bool): Retourner des dictionnaires à la place
des identifiants des justificatifs
Returns:
list[int | dict]: La liste des justificatifs (par défaut uniquement
les identifiants, sinon les dict si long est vrai)
"""
assi: Assiduite = Assiduite.query.get_or_404(assiduite_id)
return get_justifs_from_date(assi.etudid, assi.date_debut, assi.date_fin, long)
def get_justifs_from_date(
etudid: int,
date_debut: datetime,
date_fin: datetime,
long: bool = False,
valid: bool = False,
) -> list[int | dict]:
"""
get_justifs_from_date Récupération des justificatifs couvrant une période pour un étudiant donné
Args:
etudid (int): l'identifiant de l'étudiant
date_debut (datetime): la date de début (datetime avec timezone)
date_fin (datetime): la date de fin (datetime avec timezone)
long (bool, optional): Définition de la sortie.
Vrai pour avoir les dictionnaires des justificatifs.
Faux pour avoir uniquement les identifiants
Defaults to False.
valid (bool, optional): Filtre pour n'avoir que les justificatifs valide.
Si vrai : le retour ne contiendra que des justificatifs valides
Sinon le retour contiendra tout type de justificatifs
Defaults to False.
Returns:
list[int | dict]: La liste des justificatifs (par défaut uniquement
les identifiants, sinon les dict si long est vrai)
"""
# On récupère les justificatifs d'un étudiant couvrant la période donnée
justifs: Query = Justificatif.query.filter(
Justificatif.etudid == etudid,
Justificatif.date_debut <= date_debut,
Justificatif.date_fin >= date_fin,
)
# si valide est vrai alors on filtre pour n'avoir que les justificatifs valide
if valid:
justifs = justifs.filter(Justificatif.etat == EtatJustificatif.VALIDE)
# On renvoie la liste des id des justificatifs si long est Faux,
# sinon on renvoie les dicts des justificatifs
if long:
return [j.to_dict(True) for j in justifs]
return [j.justif_id for j in justifs]
def get_formsemestre_from_data(data: dict[str, datetime | int]) -> FormSemestre:
"""
get_formsemestre_from_data récupère un formsemestre en fonction des données passées
Si l'étudiant est inscrit à plusieurs formsemestre, prend le premier.
Args:
data (dict[str, datetime | int]): Une représentation simplifiée d'une
assiduité ou d'un justificatif
data = {
"etudid" : int,
"date_debut": datetime (tz),
"date_fin": datetime (tz),
}
Returns:
FormSemestre: Le formsemestre trouvé ou None
"""
return (
FormSemestre.query.join(
FormSemestreInscription,
FormSemestre.id == FormSemestreInscription.formsemestre_id,
)
.filter(
data["date_debut"] <= FormSemestre.date_fin,
data["date_fin"] >= FormSemestre.date_debut,
FormSemestreInscription.etudid == data["etudid"],
)
.first()
)