# -*- coding: UTF-8 -* ############################################################################## # ScoDoc # Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved. # See LICENSE ############################################################################## """ScoDoc models: Groups & partitions """ from operator import attrgetter from sqlalchemy.exc import IntegrityError from app import db, log from app.models import ScoDocModel, GROUPNAME_STR_LEN, SHORT_STR_LEN from app.models.etudiants import Identite from app.models.events import Scolog from app.scodoc import sco_cache from app.scodoc import sco_utils as scu from app.scodoc.sco_exceptions import AccessDenied, ScoValueError class Partition(ScoDocModel): """Partition: découpage d'une promotion en groupes""" __table_args__ = (db.UniqueConstraint("formsemestre_id", "partition_name"),) id = db.Column(db.Integer, primary_key=True) partition_id = db.synonym("id") formsemestre_id = db.Column( db.Integer, db.ForeignKey("notes_formsemestre.id"), index=True, ) # "TD", "TP", ... (NULL for 'all') partition_name = db.Column(db.String(SHORT_STR_LEN)) # Numero = ordre de presentation) numero = db.Column(db.Integer, nullable=False, default=0) # Calculer le rang ? bul_show_rank = db.Column( db.Boolean(), nullable=False, default=False, server_default="false" ) # Montrer quand on indique les groupes de l'étudiant ? show_in_lists = db.Column( db.Boolean(), nullable=False, default=True, server_default="true" ) # Editable (créer/renommer groupes) ? (faux pour les groupes de parcours) groups_editable = db.Column( db.Boolean(), nullable=False, default=True, server_default="true" ) groups = db.relationship( "GroupDescr", backref=db.backref("partition", lazy=True), lazy="dynamic", cascade="all, delete-orphan", order_by="GroupDescr.numero, GroupDescr.group_name", ) _sco_dept_relations = ("FormSemestre",) def __init__(self, **kwargs): super(Partition, self).__init__(**kwargs) if self.numero is None: # génère numero à la création last_partition = Partition.query.order_by(Partition.numero.desc()).first() if last_partition: self.numero = last_partition.numero + 1 else: self.numero = 1 def __repr__(self): return f"""<{self.__class__.__name__} {self.id} "{self.partition_name or '(default)'}">""" @classmethod def check_name( cls, formsemestre: "FormSemestre", partition_name: str, existing=False ) -> bool: """check if a partition named 'partition_name' can be created in the given formsemestre. If existing is True, allow a partition_name already existing in the formsemestre. """ if not isinstance(partition_name, str): return False if not (0 < len(partition_name.strip()) < SHORT_STR_LEN): return False if (not existing) and ( partition_name in [p.partition_name for p in formsemestre.partitions] ): return False return True @classmethod def formsemestre_remove_etud(cls, formsemestre_id: int, etud: "Identite"): "retire l'étudiant de toutes les partitions de ce semestre" for group in GroupDescr.query.join(Partition).filter_by( formsemestre_id=formsemestre_id ): group.remove_etud(etud) def is_default(self) -> bool: "vrai si partition par défault (tous les étudiants)" return not self.partition_name def is_parcours(self) -> bool: "Vrai s'il s'agit de la partition de parcours" return self.partition_name == scu.PARTITION_PARCOURS def to_dict(self, with_groups=False, str_keys: bool = False) -> dict: """as a dict, with or without groups. If str_keys, convert integer dict keys to strings (useful for JSON) """ d = dict(self.__dict__) d["partition_id"] = self.id d.pop("_sa_instance_state", None) d.pop("formsemestre", None) if with_groups: groups = sorted(self.groups, key=attrgetter("numero", "group_name")) # un dict et non plus une liste, pour JSON if str_keys: d["groups"] = { str(group.id): group.to_dict(with_partition=False) for group in groups } else: d["groups"] = { group.id: group.to_dict(with_partition=False) for group in groups } return d def get_etud_group(self, etudid: int) -> "GroupDescr": "Le groupe de l'étudiant dans cette partition, ou None si pas présent" return ( GroupDescr.query.filter_by(partition_id=self.id) .join(group_membership) .filter_by(etudid=etudid) .first() ) def set_etud_group(self, etud: "Identite", group: "GroupDescr") -> bool: """Affect etudid to group_id in given partition. Raises IntegrityError si conflit, or ValueError si ce group_id n'est pas dans cette partition ou que l'étudiant n'est pas inscrit au semestre. Return True si changement, False s'il était déjà dans ce groupe. """ if not group.id in (g.id for g in self.groups): raise ScoValueError( f"""Le groupe {group.id} n'est pas dans la partition { self.partition_name or "tous"}""" ) if etud.id not in (e.id for e in self.formsemestre.etuds): raise ScoValueError( f"""étudiant {etud.nomprenom} non inscrit au formsemestre du groupe { group.group_name}""" ) try: existing_row = ( db.session.query(group_membership) .filter_by(etudid=etud.id) .join(GroupDescr) .filter_by(partition_id=self.id) .first() ) if existing_row: existing_group_id = existing_row[1] if group.id == existing_group_id: return False # Fait le changement avec l'ORM sinon risque élevé de blocage existing_group = db.session.get(GroupDescr, existing_group_id) db.session.commit() group.etuds.append(etud) existing_group.etuds.remove(etud) db.session.add(etud) db.session.add(existing_group) db.session.add(group) else: new_row = group_membership.insert().values( etudid=etud.id, group_id=group.id ) db.session.execute(new_row) db.session.commit() except IntegrityError: db.session.rollback() raise return True def create_group(self, group_name="", default=False) -> "GroupDescr": "Crée un groupe dans cette partition" if not self.formsemestre.can_change_groups(): raise AccessDenied( """Vous n'avez pas le droit d'effectuer cette opération, ou bien le semestre est verrouillé !""" ) if group_name: group_name = group_name.strip() if not group_name and not default: raise ValueError("invalid group name: ()") if not GroupDescr.check_name(self, group_name, default=default): raise ScoValueError( f"Le groupe {group_name} existe déjà dans cette partition" ) numeros = [g.numero if g.numero is not None else 0 for g in self.groups] if len(numeros) > 0: new_numero = max(numeros) + 1 else: new_numero = 0 group = GroupDescr(partition=self, group_name=group_name, numero=new_numero) db.session.add(group) db.session.commit() log(f"create_group: created group_id={group.id}") # return group class GroupDescr(ScoDocModel): """Description d'un groupe d'une partition""" __tablename__ = "group_descr" __table_args__ = (db.UniqueConstraint("partition_id", "group_name"),) id = db.Column(db.Integer, primary_key=True) group_id = db.synonym("id") partition_id = db.Column(db.Integer, db.ForeignKey("partition.id")) group_name = db.Column(db.String(GROUPNAME_STR_LEN)) """nom du groupe: "A", "C2", ... (NULL for 'all')""" edt_id: str | None = db.Column(db.Text(), index=True, nullable=True) "identifiant emplois du temps (unicité non imposée)" numero = db.Column(db.Integer, nullable=False, default=0) "Numero = ordre de presentation" _sco_dept_relations = ( "Partition", "FormSemestre", ) etuds = db.relationship( "Identite", secondary="group_membership", lazy="dynamic", ) def __repr__(self): return ( f"""<{self.__class__.__name__} {self.id} "{self.group_name or '(tous)'}">""" ) @classmethod def filter_model_attributes(cls, data: dict, excluded: set[str] = None) -> dict: """Returns a copy of dict with only the keys belonging to the Model and not in excluded. Exclude `partition_id` : a group cannot be moved from a partition to another. """ return super().filter_model_attributes( data, excluded=(excluded or set()) | {"partition_id"}, ) def get_nom_with_part(self, default="-") -> str: """Nom avec partition: 'TD A' Si groupe par défaut (tous), utilise default ou "-" """ if self.partition.partition_name is None: return default return f"{self.partition.partition_name or ''} {self.group_name or '-'}" def to_dict(self, with_partition=True) -> dict: """as a dict, with or without partition""" if with_partition: partition_dict = self.partition.to_dict(with_groups=False) d = dict(self.__dict__) d.pop("_sa_instance_state", None) if with_partition: d["partition"] = partition_dict return d def get_edt_ids(self) -> list[str]: "les ids normalisés pour l'emploi du temps: à défaut, le nom scodoc du groupe" return [ scu.normalize_edt_id(x) for x in scu.split_id(self.edt_id) or [self.group_name] or [] ] def get_nb_inscrits(self) -> int: """Nombre inscrits à ce group et au formsemestre. C'est nécessaire car lors d'une désinscription, on conserve l'appartenance aux groupes pour faciliter une éventuelle ré-inscription. """ from app.models.formsemestre import FormSemestreInscription return ( Identite.query.join(group_membership) .filter_by(group_id=self.id) .join(FormSemestreInscription) .filter_by(formsemestre_id=self.partition.formsemestre.id) .count() ) @classmethod def check_name( cls, partition: "Partition", group_name: str, existing=False, default=False ) -> bool: """check if a group named 'group_name' can be created in the given partition. If existing is True, allow a group_name already existing in the partition. If default, name must be empty and default group must not yet exists. """ if not isinstance(group_name, str): return False if not default and not (0 < len(group_name.strip()) < GROUPNAME_STR_LEN): return False if (not existing) and (group_name in [g.group_name for g in partition.groups]): return False return True def set_name(self, group_name: str, dest_url: str = None): """Set group name, and optionally edt_id. Check permission (partition must be groups_editable) and invalidate caches. Commit session. dest_url is used for error messages. """ if not self.partition.formsemestre.can_change_groups(): raise AccessDenied("Vous n'avez pas le droit d'effectuer cette opération !") if self.group_name is None: raise ValueError("can't set a name to default group") if not self.partition.groups_editable: raise AccessDenied("Partition non éditable") if group_name: group_name = group_name.strip() if not group_name: raise ScoValueError("nom de groupe vide !", dest_url=dest_url) if group_name != self.group_name and not GroupDescr.check_name( self.partition, group_name ): raise ScoValueError( "Le nom de groupe existe déjà dans la partition", dest_url=dest_url ) self.group_name = group_name db.session.add(self) db.session.commit() sco_cache.invalidate_formsemestre( formsemestre_id=self.partition.formsemestre_id ) def set_edt_id(self, edt_id: str): "Set edt_id. Check permission. Commit session." if not self.partition.formsemestre.can_change_groups(): raise AccessDenied("Vous n'avez pas le droit d'effectuer cette opération !") if isinstance(edt_id, str): edt_id = edt_id.strip() or None self.edt_id = edt_id db.session.add(self) db.session.commit() def remove_etud(self, etud: "Identite"): "Enlève l'étudiant de ce groupe s'il en fait partie (ne fait rien sinon)" if etud in self.etuds: self.etuds.remove(etud) Scolog.logdb( method="group_remove_etud", etudid=etud.id, msg=f"Retrait du groupe {self.group_name} de {self.partition.partition_name}", ) db.session.commit() # Update parcours if self.partition.partition_name == scu.PARTITION_PARCOURS: self.partition.formsemestre.update_inscriptions_parcours_from_groups( etudid=etud.id ) sco_cache.invalidate_formsemestre(self.partition.formsemestre_id) group_membership = db.Table( "group_membership", db.Column("etudid", db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE")), db.Column("group_id", db.Integer, db.ForeignKey("group_descr.id")), db.UniqueConstraint("etudid", "group_id"), ) # class GroupMembership(db.Model): # """Association groupe / étudiant""" # __tablename__ = "group_membership" # __table_args__ = (db.UniqueConstraint("etudid", "group_id"),) # id = db.Column(db.Integer, primary_key=True) # etudid = db.Column(db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE")) # group_id = db.Column(db.Integer, db.ForeignKey("group_descr.id"))