# -*- coding: UTF-8 -* """Gestion de l'assiduité (assiduités + justificatifs) """ from datetime import datetime from flask_login import current_user from flask_sqlalchemy.query import Query from sqlalchemy.exc import DataError from app import db, log, g, set_sco_dept from app.models import ( ModuleImpl, Module, Scolog, FormSemestre, FormSemestreInscription, ScoDocModel, ) from app.models.etudiants import Identite from app.auth.models import User from app.scodoc import sco_abs_notification from app.scodoc.sco_archives_justificatifs import JustificatifArchiver from app.scodoc.sco_exceptions import ScoValueError from app.scodoc.sco_permissions import Permission from app.scodoc.sco_utils import ( EtatAssiduite, EtatJustificatif, localize_datetime, is_assiduites_module_forced, NonWorkDays, ) class Assiduite(ScoDocModel): """ Représente une assiduité: - une plage horaire lié à un état et un étudiant - un module si spécifiée - une description si spécifiée """ __tablename__ = "assiduites" id = db.Column(db.Integer, primary_key=True, nullable=False) assiduite_id = db.synonym("id") date_debut = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) date_fin = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) moduleimpl_id = db.Column( db.Integer, db.ForeignKey("notes_moduleimpl.id", ondelete="SET NULL"), ) etudid = db.Column( db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE"), index=True, nullable=False, ) etat = db.Column(db.Integer, nullable=False) description = db.Column(db.Text) entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now()) user_id = db.Column( db.Integer, db.ForeignKey("user.id", ondelete="SET NULL"), nullable=True, ) est_just = db.Column(db.Boolean, server_default="false", nullable=False) external_data = db.Column(db.JSON, nullable=True) # Déclare la relation "joined" car on va très souvent vouloir récupérer # l'étudiant en même tant que l'assiduité (perf.: évite nouvelle requete SQL) etudiant = db.relationship("Identite", back_populates="assiduites", lazy="joined") # En revanche, user est rarement accédé: user = db.relationship( "User", backref=db.backref( "assiduites", lazy="select", order_by="Assiduite.entry_date" ), lazy="select", ) def to_dict(self, format_api=True, restrict: bool | None = None) -> dict: """Retourne la représentation json de l'assiduité restrict n'est pas utilisé ici. """ etat = self.etat user: User | None = None if format_api: # format api utilise les noms "present,absent,retard" au lieu des int etat = EtatAssiduite.inverse().get(self.etat).name if self.user_id is not None: user = db.session.get(User, self.user_id) data = { "assiduite_id": self.id, "etudid": self.etudid, "code_nip": self.etudiant.code_nip, "moduleimpl_id": self.moduleimpl_id, "date_debut": self.date_debut, "date_fin": self.date_fin, "etat": etat, "desc": self.description, "entry_date": self.entry_date, "user_id": None if user is None else user.id, # l'uid "user_name": None if user is None else user.user_name, # le login "user_nom_complet": None if user is None else user.get_nomcomplet(), # "Marie Dupont" "est_just": self.est_just, "external_data": self.external_data, } return data def __str__(self) -> str: "chaine pour journaux et debug (lisible par humain français)" try: etat_str = EtatAssiduite(self.etat).name.lower().capitalize() except ValueError: etat_str = "Invalide" return f"""{etat_str} { "just." if self.est_just else "non just." } de { self.date_debut.strftime("%d/%m/%Y %Hh%M") } à { self.date_fin.strftime("%d/%m/%Y %Hh%M") }""" @classmethod def create_assiduite( cls, etud: Identite, date_debut: datetime, date_fin: datetime, etat: EtatAssiduite, moduleimpl: ModuleImpl = None, description: str = None, entry_date: datetime = None, user_id: int = None, est_just: bool = False, external_data: dict = None, notify_mail=False, ) -> "Assiduite": """Créer une nouvelle assiduité pour l'étudiant. Les datetime doivent être en timezone serveur. Raises ScoValueError en cas de conflit ou erreur. """ if date_debut.tzinfo is None: log( f"Warning: create_assiduite: date_debut without timezone ({date_debut})" ) if date_fin.tzinfo is None: log(f"Warning: create_assiduite: date_fin without timezone ({date_fin})") # Vérification jours non travaillés # -> vérifie si la date de début ou la date de fin est sur un jour non travaillé # On récupère les formsemestres des dates de début et de fin formsemestre_date_debut: FormSemestre = get_formsemestre_from_data( { "etudid": etud.id, "date_debut": date_debut, "date_fin": date_debut, } ) formsemestre_date_fin: FormSemestre = get_formsemestre_from_data( { "etudid": etud.id, "date_debut": date_fin, "date_fin": date_fin, } ) if date_debut.weekday() in NonWorkDays.get_all_non_work_days( formsemestre_id=formsemestre_date_debut ): raise ScoValueError("La date de début n'est pas un jour travaillé") if date_fin.weekday() in NonWorkDays.get_all_non_work_days( formsemestre_id=formsemestre_date_fin ): raise ScoValueError("La date de fin n'est pas un jour travaillé") # Vérification de non duplication des périodes assiduites: Query = etud.assiduites if is_period_conflicting(date_debut, date_fin, assiduites, Assiduite): log( f"""create_assiduite: period_conflicting etudid={etud.id} date_debut={ date_debut} date_fin={date_fin}""" ) raise ScoValueError( "Duplication: la période rentre en conflit avec une plage enregistrée" ) if not est_just: est_just = ( len( get_justifs_from_date(etud.etudid, date_debut, date_fin, valid=True) ) > 0 ) moduleimpl_id = None if moduleimpl is not None: # Vérification de l'inscription de l'étudiant if moduleimpl.est_inscrit(etud): moduleimpl_id = moduleimpl.id else: raise ScoValueError("L'étudiant n'est pas inscrit au module") elif not ( external_data is not None and external_data.get("module") is not None ): # Vérification si module forcé formsemestre: FormSemestre = get_formsemestre_from_data( {"etudid": etud.id, "date_debut": date_debut, "date_fin": date_fin} ) force: bool if formsemestre: force = is_assiduites_module_forced(formsemestre_id=formsemestre.id) else: force = is_assiduites_module_forced(dept_id=etud.dept_id) if force: raise ScoValueError("Module non renseigné") nouv_assiduite = Assiduite( date_debut=date_debut, date_fin=date_fin, description=description, entry_date=entry_date, est_just=est_just, etat=etat, etudiant=etud, external_data=external_data, moduleimpl_id=moduleimpl_id, user_id=user_id, ) db.session.add(nouv_assiduite) db.session.flush() log(f"create_assiduite: {etud.id} id={nouv_assiduite.id} {nouv_assiduite}") Scolog.logdb( method="create_assiduite", etudid=etud.id, msg=f"assiduité: {nouv_assiduite}", ) if notify_mail and etat == EtatAssiduite.ABSENT: sco_abs_notification.abs_notify(etud.id, nouv_assiduite.date_debut) return nouv_assiduite def set_moduleimpl(self, moduleimpl_id: int | str): """Mise à jour du moduleimpl_id Les valeurs du champ "moduleimpl_id" possibles sont : - <int> (un id classique) - <str> ("autre" ou "<id>") - "" (pas de moduleimpl_id) Si la valeur est "autre" il faut: - mettre à None assiduité.moduleimpl_id - mettre à jour assiduite.external_data["module"] = "autre" En fonction de la configuration du semestre (option force_module) la valeur "" peut-être considérée comme invalide. - Il faudra donc vérifier que ce n'est pas le cas avant de mettre à jour l'assiduité """ moduleimpl: ModuleImpl = None if moduleimpl_id == "autre": # Configuration de external_data pour Module Autre # Si self.external_data None alors on créé un dictionnaire {"module": "autre"} # Sinon on met à jour external_data["module"] à "autre" if self.external_data is None: self.external_data = {"module": "autre"} else: self.external_data["module"] = "autre" # Dans tous les cas une fois fait, assiduite.moduleimpl_id doit être None self.moduleimpl_id = None # Ici pas de vérification du force module car on l'a mis dans "external_data" return if moduleimpl_id != "": try: moduleimpl_id = int(moduleimpl_id) except ValueError as exc: raise ScoValueError("Module non reconnu") from exc moduleimpl: ModuleImpl = ModuleImpl.query.get(moduleimpl_id) # ici moduleimpl est None si non spécifié # Vérification ModuleImpl not None (raise ScoValueError) if moduleimpl is None: self._check_force_module() # Ici uniquement si on est autorisé à ne pas avoir de module self.moduleimpl_id = None return # Vérification Inscription ModuleImpl (raise ScoValueError) if moduleimpl.est_inscrit(self.etudiant): self.moduleimpl_id = moduleimpl.id else: raise ScoValueError("L'étudiant n'est pas inscrit au module") def supprime(self): "Supprime l'assiduité. Log et commit." from app.scodoc import sco_assiduites as scass if g.scodoc_dept is None and self.etudiant.dept_id is not None: # route sans département set_sco_dept(self.etudiant.departement.acronym) obj_dict: dict = self.to_dict() # Suppression de l'objet et LOG log(f"delete_assidutite: {self.etudiant.id} {self}") Scolog.logdb( method="delete_assiduite", etudid=self.etudiant.id, msg=f"Assiduité: {self}", ) db.session.delete(self) db.session.commit() # Invalidation du cache scass.simple_invalidate_cache(obj_dict) def get_formsemestre(self) -> FormSemestre: """Le formsemestre associé. Attention: en cas d'inscription multiple prend arbitrairement l'un des semestres. A utiliser avec précaution ! """ return get_formsemestre_from_data(self.to_dict()) def get_module(self, traduire: bool = False) -> int | str: "TODO documenter" if self.moduleimpl_id is not None: if traduire: modimpl: ModuleImpl = ModuleImpl.query.get(self.moduleimpl_id) mod: Module = Module.query.get(modimpl.module_id) return f"{mod.code} {mod.titre}" elif self.external_data is not None and "module" in self.external_data: return ( "Tout module" if self.external_data["module"] == "Autre" else self.external_data["module"] ) return "Non spécifié" if traduire else None def get_saisie(self) -> str: """ retourne le texte "saisie le <date> par <User>" """ date: str = self.entry_date.strftime("%d/%m/%Y à %H:%M") utilisateur: str = "" if self.user != None: self.user: User utilisateur = f"par {self.user.get_prenomnom()}" return f"saisie le {date} {utilisateur}" def _check_force_module(self): """Vérification si module forcé: Si le module est requis, raise ScoValueError sinon ne fait rien. """ # cherche le formsemestre affecté pour utiliser ses préférences formsemestre: FormSemestre = get_formsemestre_from_data( { "etudid": self.etudid, "date_debut": self.date_debut, "date_fin": self.date_fin, } ) formsemestre_id = formsemestre.id if formsemestre else None # si pas de formsemestre, utilisera les prefs globales du département dept_id = self.etudiant.dept_id force = is_assiduites_module_forced( formsemestre_id=formsemestre_id, dept_id=dept_id ) if force: raise ScoValueError("Module non renseigné") class Justificatif(ScoDocModel): """ Représente un justificatif: - une plage horaire lié à un état et un étudiant - une raison si spécifiée - un fichier si spécifié """ __tablename__ = "justificatifs" id = db.Column(db.Integer, primary_key=True) justif_id = db.synonym("id") date_debut = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) date_fin = db.Column( db.DateTime(timezone=True), server_default=db.func.now(), nullable=False ) etudid = db.Column( db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE"), index=True, nullable=False, ) etat = db.Column( db.Integer, nullable=False, ) entry_date = db.Column(db.DateTime(timezone=True), server_default=db.func.now()) "date de création de l'élément: date de saisie" # pourrait devenir date de dépot au secrétariat, si différente user_id = db.Column( db.Integer, db.ForeignKey("user.id", ondelete="SET NULL"), nullable=True, index=True, ) raison = db.Column(db.Text()) # Archive_id -> sco_archives_justificatifs.py fichier = db.Column(db.Text()) # Déclare la relation "joined" car on va très souvent vouloir récupérer # l'étudiant en même tant que le justificatif (perf.: évite nouvelle requete SQL) etudiant = db.relationship( "Identite", back_populates="justificatifs", lazy="joined" ) # En revanche, user est rarement accédé: user = db.relationship( "User", backref=db.backref( "justificatifs", lazy="select", order_by="Justificatif.entry_date" ), lazy="select", ) external_data = db.Column(db.JSON, nullable=True) @classmethod def get_justificatif(cls, justif_id: int) -> "Justificatif": """Justificatif ou 404, cherche uniquement dans le département courant""" query = Justificatif.query.filter_by(id=justif_id) if g.scodoc_dept: query = query.join(Identite).filter_by(dept_id=g.scodoc_dept_id) return query.first_or_404() def to_dict(self, format_api: bool = False, restrict: bool = False) -> dict: """L'objet en dictionnaire sérialisable. Si restrict, ne donne par la raison et les fichiers et external_data """ etat = self.etat user: User = self.user if self.user_id is not None else None if format_api: etat = EtatJustificatif.inverse().get(self.etat).name data = { "justif_id": self.justif_id, "etudid": self.etudid, "code_nip": self.etudiant.code_nip, "date_debut": self.date_debut, "date_fin": self.date_fin, "etat": etat, "raison": None if restrict else self.raison, "fichier": None if restrict else self.fichier, "entry_date": self.entry_date, "user_id": None if user is None else user.id, # l'uid "user_name": None if user is None else user.user_name, # le login "user_nom_complet": None if user is None else user.get_nomcomplet(), "external_data": None if restrict else self.external_data, } return data def __repr__(self) -> str: "chaine pour journaux et debug (lisible par humain français)" try: etat_str = EtatJustificatif(self.etat).name except ValueError: etat_str = "Invalide" return f"""Justificatif id={self.id} {etat_str} de { self.date_debut.strftime("%d/%m/%Y %Hh%M") } à { self.date_fin.strftime("%d/%m/%Y %Hh%M") }""" @classmethod def convert_dict_fields(cls, args: dict) -> dict: """Convert fields. Called by ScoDocModel's create_from_dict, edit and from_dict Raises ScoValueError si paramètres incorrects. """ if not isinstance(args["date_debut"], datetime) or not isinstance( args["date_fin"], datetime ): raise ScoValueError("type date incorrect") if args["date_fin"] <= args["date_debut"]: raise ScoValueError("dates incompatibles") if args["entry_date"] and not isinstance(args["entry_date"], datetime): raise ScoValueError("type entry_date incorrect") return args @classmethod def create_justificatif( cls, etudiant: Identite, date_debut: datetime, date_fin: datetime, etat: EtatJustificatif, raison: str = None, entry_date: datetime = None, user_id: int = None, external_data: dict = None, ) -> "Justificatif": """Créer un nouveau justificatif pour l'étudiant. Raises ScoValueError si paramètres incorrects. """ nouv_justificatif = cls.create_from_dict(locals()) db.session.commit() log(f"create_justificatif: etudid={etudiant.id} {nouv_justificatif}") Scolog.logdb( method="create_justificatif", etudid=etudiant.id, msg=f"justificatif: {nouv_justificatif}", ) return nouv_justificatif def supprime(self): "Supprime le justificatif. Log et commit." from app.scodoc import sco_assiduites as scass from app.scodoc.sco_archives_justificatifs import JustificatifArchiver # Récupération de l'archive du justificatif archive_name: str = self.fichier if archive_name is not None: # Si elle existe : on essaye de la supprimer archiver: JustificatifArchiver = JustificatifArchiver() try: archiver.delete_justificatif(self.etudiant, archive_name) except ValueError: pass if g.scodoc_dept is None and self.etudiant.dept_id is not None: # route sans département set_sco_dept(self.etudiant.departement.acronym) # On invalide le cache scass.simple_invalidate_cache(self.to_dict()) # Suppression de l'objet et LOG log(f"delete_justificatif: {self.etudiant.id} {self}") Scolog.logdb( method="delete_justificatif", etudid=self.etudiant.id, msg=f"Justificatif: {self}", ) db.session.delete(self) db.session.commit() # On actualise les assiduités justifiées de l'étudiant concerné compute_assiduites_justified( self.etudid, Justificatif.query.filter_by(etudid=self.etudid).all(), True, ) def get_fichiers(self) -> tuple[list[str], int]: """Renvoie la liste des noms de fichiers justicatifs accessibles par l'utilisateur courant et le nombre total de fichiers. (ces fichiers sont dans l'archive associée) """ if self.fichier is None: return [], 0 archive_name: str = self.fichier archiver: JustificatifArchiver = JustificatifArchiver() filenames = archiver.list_justificatifs(archive_name, self.etudiant) accessible_filenames = [] # for filename in filenames: if int(filename[1]) == current_user.id or current_user.has_permission( Permission.AbsJustifView ): accessible_filenames.append(filename[0]) return accessible_filenames, len(filenames) def is_period_conflicting( date_debut: datetime, date_fin: datetime, collection: Query, collection_cls: Assiduite | Justificatif, ) -> bool: """ Vérifie si une date n'entre pas en collision avec les justificatifs ou assiduites déjà présentes """ # On s'assure que les dates soient avec TimeZone date_debut = localize_datetime(date_debut) date_fin = localize_datetime(date_fin) count: int = collection.filter( collection_cls.date_debut < date_fin, collection_cls.date_fin > date_debut ).count() return count > 0 def compute_assiduites_justified( etudid: int, justificatifs: list[Justificatif] = None, reset: bool = False ) -> list[int]: """ Args: etudid (int): l'identifiant de l'étudiant justificatifs (list[Justificatif]): La liste des justificatifs qui seront utilisés reset (bool, optional): remet à false les assiduites non justifiés. Defaults to False. Returns: list[int]: la liste des assiduités qui ont été justifiées. """ # TODO à optimiser (car très long avec 40000 assiduités) # On devrait : # - récupérer uniquement les assiduités qui sont sur la période des justificatifs donnés # - Pour chaque assiduité trouvée, il faut récupérer les justificatifs qui la justifie # - Si au moins un justificatif valide couvre la période de l'assiduité alors on la justifie # Si on ne donne pas de justificatifs on prendra par défaut tous les justificatifs de l'étudiant if justificatifs is None: justificatifs: list[Justificatif] = Justificatif.query.filter_by( etudid=etudid ).all() # On ne prend que les justificatifs valides justificatifs = [j for j in justificatifs if j.etat == EtatJustificatif.VALIDE] # On récupère les assiduités de l'étudiant assiduites: Assiduite = Assiduite.query.filter_by(etudid=etudid) assiduites_justifiees: list[int] = [] for assi in assiduites: # On ne justifie pas les Présences if assi.etat == EtatAssiduite.PRESENT: continue # On récupère les justificatifs qui justifient l'assiduité `assi` assi_justificatifs = Justificatif.query.filter( Justificatif.etudid == assi.etudid, Justificatif.date_debut <= assi.date_debut, Justificatif.date_fin >= assi.date_fin, Justificatif.etat == EtatJustificatif.VALIDE, ).all() # Si au moins un justificatif possède une période qui couvre l'assiduité if any( assi.date_debut >= j.date_debut and assi.date_fin <= j.date_fin for j in justificatifs + assi_justificatifs ): # On justifie l'assiduité # On ajoute l'id de l'assiduité à la liste des assiduités justifiées assi.est_just = True assiduites_justifiees.append(assi.assiduite_id) db.session.add(assi) elif reset: # Si le paramètre reset est Vrai alors les assiduités non justifiées # sont remise en "non justifiée" assi.est_just = False db.session.add(assi) # On valide la session db.session.commit() # On renvoie la liste des assiduite_id des assiduités justifiées return assiduites_justifiees def get_assiduites_justif(assiduite_id: int, long: bool) -> list[int | dict]: """ get_assiduites_justif Récupération des justificatifs d'une assiduité Args: assiduite_id (int): l'identifiant de l'assiduité long (bool): Retourner des dictionnaires à la place des identifiants des justificatifs Returns: list[int | dict]: La liste des justificatifs (par défaut uniquement les identifiants, sinon les dict si long est vrai) """ assi: Assiduite = Assiduite.query.get_or_404(assiduite_id) return get_justifs_from_date(assi.etudid, assi.date_debut, assi.date_fin, long) def get_justifs_from_date( etudid: int, date_debut: datetime, date_fin: datetime, long: bool = False, valid: bool = False, ) -> list[int | dict]: """ get_justifs_from_date Récupération des justificatifs couvrant une période pour un étudiant donné Args: etudid (int): l'identifiant de l'étudiant date_debut (datetime): la date de début (datetime avec timezone) date_fin (datetime): la date de fin (datetime avec timezone) long (bool, optional): Définition de la sortie. Vrai pour avoir les dictionnaires des justificatifs. Faux pour avoir uniquement les identifiants Defaults to False. valid (bool, optional): Filtre pour n'avoir que les justificatifs valide. Si vrai : le retour ne contiendra que des justificatifs valides Sinon le retour contiendra tout type de justificatifs Defaults to False. Returns: list[int | dict]: La liste des justificatifs (par défaut uniquement les identifiants, sinon les dict si long est vrai) """ # On récupère les justificatifs d'un étudiant couvrant la période donnée justifs: Query = Justificatif.query.filter( Justificatif.etudid == etudid, Justificatif.date_debut <= date_debut, Justificatif.date_fin >= date_fin, ) # si valide est vrai alors on filtre pour n'avoir que les justificatifs valide if valid: justifs = justifs.filter(Justificatif.etat == EtatJustificatif.VALIDE) # On renvoie la liste des id des justificatifs si long est Faux, # sinon on renvoie les dicts des justificatifs if long: return [j.to_dict(True) for j in justifs] return [j.justif_id for j in justifs] def get_formsemestre_from_data(data: dict[str, datetime | int]) -> FormSemestre: """ get_formsemestre_from_data récupère un formsemestre en fonction des données passées Si l'étudiant est inscrit à plusieurs formsemestre, prend le premier. Args: data (dict[str, datetime | int]): Une représentation simplifiée d'une assiduité ou d'un justificatif data = { "etudid" : int, "date_debut": datetime (tz), "date_fin": datetime (tz), } Returns: FormSemestre: Le formsemestre trouvé ou None """ return ( FormSemestre.query.join( FormSemestreInscription, FormSemestre.id == FormSemestreInscription.formsemestre_id, ) .filter( data["date_debut"] <= FormSemestre.date_fin, data["date_fin"] >= FormSemestre.date_debut, FormSemestreInscription.etudid == data["etudid"], ) .first() )