359 lines
14 KiB
Python
359 lines
14 KiB
Python
# -*- coding: UTF-8 -*
|
|
##############################################################################
|
|
# ScoDoc
|
|
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
|
|
# See LICENSE
|
|
##############################################################################
|
|
|
|
"""ScoDoc models: Groups & partitions
|
|
"""
|
|
from operator import attrgetter
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from app import db, log
|
|
from app.models import ScoDocModel, GROUPNAME_STR_LEN, SHORT_STR_LEN
|
|
from app.models.etudiants import Identite
|
|
from app.models.events import Scolog
|
|
from app.scodoc import sco_cache
|
|
from app.scodoc import sco_utils as scu
|
|
from app.scodoc.sco_exceptions import AccessDenied, ScoValueError
|
|
|
|
|
|
class Partition(db.Model, ScoDocModel):
|
|
"""Partition: découpage d'une promotion en groupes"""
|
|
|
|
__table_args__ = (db.UniqueConstraint("formsemestre_id", "partition_name"),)
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
partition_id = db.synonym("id")
|
|
formsemestre_id = db.Column(
|
|
db.Integer,
|
|
db.ForeignKey("notes_formsemestre.id"),
|
|
index=True,
|
|
)
|
|
# "TD", "TP", ... (NULL for 'all')
|
|
partition_name = db.Column(db.String(SHORT_STR_LEN))
|
|
# Numero = ordre de presentation)
|
|
numero = db.Column(db.Integer, nullable=False, default=0)
|
|
# Calculer le rang ?
|
|
bul_show_rank = db.Column(
|
|
db.Boolean(), nullable=False, default=False, server_default="false"
|
|
)
|
|
# Montrer quand on indique les groupes de l'étudiant ?
|
|
show_in_lists = db.Column(
|
|
db.Boolean(), nullable=False, default=True, server_default="true"
|
|
)
|
|
# Editable (créer/renommer groupes) ? (faux pour les groupes de parcours)
|
|
groups_editable = db.Column(
|
|
db.Boolean(), nullable=False, default=True, server_default="true"
|
|
)
|
|
groups = db.relationship(
|
|
"GroupDescr",
|
|
backref=db.backref("partition", lazy=True),
|
|
lazy="dynamic",
|
|
cascade="all, delete-orphan",
|
|
order_by="GroupDescr.numero, GroupDescr.group_name",
|
|
)
|
|
|
|
def __init__(self, **kwargs):
|
|
super(Partition, self).__init__(**kwargs)
|
|
if self.numero is None:
|
|
# génère numero à la création
|
|
last_partition = Partition.query.order_by(Partition.numero.desc()).first()
|
|
if last_partition:
|
|
self.numero = last_partition.numero + 1
|
|
else:
|
|
self.numero = 1
|
|
|
|
def __repr__(self):
|
|
return f"""<{self.__class__.__name__} {self.id} "{self.partition_name or '(default)'}">"""
|
|
|
|
@classmethod
|
|
def check_name(
|
|
cls, formsemestre: "FormSemestre", partition_name: str, existing=False
|
|
) -> bool:
|
|
"""check if a partition named 'partition_name' can be created in the given formsemestre.
|
|
If existing is True, allow a partition_name already existing in the formsemestre.
|
|
"""
|
|
if not isinstance(partition_name, str):
|
|
return False
|
|
if not (0 < len(partition_name.strip()) < SHORT_STR_LEN):
|
|
return False
|
|
if (not existing) and (
|
|
partition_name in [p.partition_name for p in formsemestre.partitions]
|
|
):
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def formsemestre_remove_etud(cls, formsemestre_id: int, etud: "Identite"):
|
|
"retire l'étudiant de toutes les partitions de ce semestre"
|
|
for group in GroupDescr.query.join(Partition).filter_by(
|
|
formsemestre_id=formsemestre_id
|
|
):
|
|
group.remove_etud(etud)
|
|
|
|
def is_parcours(self) -> bool:
|
|
"Vrai s'il s'agit de la partition de parcours"
|
|
return self.partition_name == scu.PARTITION_PARCOURS
|
|
|
|
def to_dict(self, with_groups=False, str_keys: bool = False) -> dict:
|
|
"""as a dict, with or without groups.
|
|
If str_keys, convert integer dict keys to strings (useful for JSON)
|
|
"""
|
|
d = dict(self.__dict__)
|
|
d["partition_id"] = self.id
|
|
d.pop("_sa_instance_state", None)
|
|
d.pop("formsemestre", None)
|
|
|
|
if with_groups:
|
|
groups = sorted(self.groups, key=attrgetter("numero", "group_name"))
|
|
# un dict et non plus une liste, pour JSON
|
|
if str_keys:
|
|
d["groups"] = {
|
|
str(group.id): group.to_dict(with_partition=False)
|
|
for group in groups
|
|
}
|
|
else:
|
|
d["groups"] = {
|
|
group.id: group.to_dict(with_partition=False) for group in groups
|
|
}
|
|
return d
|
|
|
|
def get_etud_group(self, etudid: int) -> "GroupDescr":
|
|
"Le groupe de l'étudiant dans cette partition, ou None si pas présent"
|
|
return (
|
|
GroupDescr.query.filter_by(partition_id=self.id)
|
|
.join(group_membership)
|
|
.filter_by(etudid=etudid)
|
|
.first()
|
|
)
|
|
|
|
def set_etud_group(self, etud: "Identite", group: "GroupDescr") -> bool:
|
|
"""Affect etudid to group_id in given partition.
|
|
Raises IntegrityError si conflit,
|
|
or ValueError si ce group_id n'est pas dans cette partition
|
|
ou que l'étudiant n'est pas inscrit au semestre.
|
|
Return True si changement, False s'il était déjà dans ce groupe.
|
|
"""
|
|
if not group.id in (g.id for g in self.groups):
|
|
raise ScoValueError(
|
|
f"""Le groupe {group.id} n'est pas dans la partition {
|
|
self.partition_name or "tous"}"""
|
|
)
|
|
if etud.id not in (e.id for e in self.formsemestre.etuds):
|
|
raise ScoValueError(
|
|
f"""étudiant {etud.nomprenom} non inscrit au formsemestre du groupe {
|
|
group.group_name}"""
|
|
)
|
|
try:
|
|
existing_row = (
|
|
db.session.query(group_membership)
|
|
.filter_by(etudid=etud.id)
|
|
.join(GroupDescr)
|
|
.filter_by(partition_id=self.id)
|
|
.first()
|
|
)
|
|
if existing_row:
|
|
existing_group_id = existing_row[1]
|
|
if group.id == existing_group_id:
|
|
return False
|
|
# Fait le changement avec l'ORM sinon risque élevé de blocage
|
|
existing_group = db.session.get(GroupDescr, existing_group_id)
|
|
db.session.commit()
|
|
group.etuds.append(etud)
|
|
existing_group.etuds.remove(etud)
|
|
db.session.add(etud)
|
|
db.session.add(existing_group)
|
|
db.session.add(group)
|
|
else:
|
|
new_row = group_membership.insert().values(
|
|
etudid=etud.id, group_id=group.id
|
|
)
|
|
db.session.execute(new_row)
|
|
db.session.commit()
|
|
except IntegrityError:
|
|
db.session.rollback()
|
|
raise
|
|
return True
|
|
|
|
def create_group(self, group_name="", default=False) -> "GroupDescr":
|
|
"Crée un groupe dans cette partition"
|
|
if not self.formsemestre.can_change_groups():
|
|
raise AccessDenied(
|
|
"""Vous n'avez pas le droit d'effectuer cette opération,
|
|
ou bien le semestre est verrouillé !"""
|
|
)
|
|
if group_name:
|
|
group_name = group_name.strip()
|
|
if not group_name and not default:
|
|
raise ValueError("invalid group name: ()")
|
|
if not GroupDescr.check_name(self, group_name, default=default):
|
|
raise ScoValueError(
|
|
f"Le groupe {group_name} existe déjà dans cette partition"
|
|
)
|
|
numeros = [g.numero if g.numero is not None else 0 for g in self.groups]
|
|
if len(numeros) > 0:
|
|
new_numero = max(numeros) + 1
|
|
else:
|
|
new_numero = 0
|
|
group = GroupDescr(partition=self, group_name=group_name, numero=new_numero)
|
|
db.session.add(group)
|
|
db.session.commit()
|
|
log(f"create_group: created group_id={group.id}")
|
|
#
|
|
return group
|
|
|
|
|
|
class GroupDescr(db.Model, ScoDocModel):
|
|
"""Description d'un groupe d'une partition"""
|
|
|
|
__tablename__ = "group_descr"
|
|
__table_args__ = (db.UniqueConstraint("partition_id", "group_name"),)
|
|
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
group_id = db.synonym("id")
|
|
partition_id = db.Column(db.Integer, db.ForeignKey("partition.id"))
|
|
group_name = db.Column(db.String(GROUPNAME_STR_LEN))
|
|
"""nom du groupe: "A", "C2", ... (NULL for 'all')"""
|
|
edt_id: str | None = db.Column(db.Text(), index=True, nullable=True)
|
|
"identifiant emplois du temps (unicité non imposée)"
|
|
numero = db.Column(db.Integer, nullable=False, default=0)
|
|
"Numero = ordre de presentation"
|
|
|
|
etuds = db.relationship(
|
|
"Identite",
|
|
secondary="group_membership",
|
|
lazy="dynamic",
|
|
)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"""<{self.__class__.__name__} {self.id} "{self.group_name or '(tous)'}">"""
|
|
)
|
|
|
|
def get_nom_with_part(self, default="-") -> str:
|
|
"""Nom avec partition: 'TD A'
|
|
Si groupe par défaut (tous), utilise default ou "-"
|
|
"""
|
|
if self.partition.partition_name is None:
|
|
return default
|
|
return f"{self.partition.partition_name or ''} {self.group_name or '-'}"
|
|
|
|
def to_dict(self, with_partition=True) -> dict:
|
|
"""as a dict, with or without partition"""
|
|
d = dict(self.__dict__)
|
|
d.pop("_sa_instance_state", None)
|
|
if with_partition:
|
|
d["partition"] = self.partition.to_dict(with_groups=False)
|
|
return d
|
|
|
|
def get_edt_ids(self) -> list[str]:
|
|
"les ids pour l'emploi du temps: à défaut, le nom scodoc du groupe"
|
|
return scu.split_id(self.edt_id) or [self.group_name] or []
|
|
|
|
def get_nb_inscrits(self) -> int:
|
|
"""Nombre inscrits à ce group et au formsemestre.
|
|
C'est nécessaire car lors d'une désinscription, on conserve l'appartenance
|
|
aux groupes pour faciliter une éventuelle ré-inscription.
|
|
"""
|
|
from app.models.formsemestre import FormSemestreInscription
|
|
|
|
return (
|
|
Identite.query.join(group_membership)
|
|
.filter_by(group_id=self.id)
|
|
.join(FormSemestreInscription)
|
|
.filter_by(formsemestre_id=self.partition.formsemestre.id)
|
|
.count()
|
|
)
|
|
|
|
@classmethod
|
|
def check_name(
|
|
cls, partition: "Partition", group_name: str, existing=False, default=False
|
|
) -> bool:
|
|
"""check if a group named 'group_name' can be created in the given partition.
|
|
If existing is True, allow a group_name already existing in the partition.
|
|
If default, name must be empty and default group must not yet exists.
|
|
"""
|
|
if not isinstance(group_name, str):
|
|
return False
|
|
if not default and not (0 < len(group_name.strip()) < GROUPNAME_STR_LEN):
|
|
return False
|
|
if (not existing) and (group_name in [g.group_name for g in partition.groups]):
|
|
return False
|
|
return True
|
|
|
|
def set_name(self, group_name: str, dest_url: str = None):
|
|
"""Set group name, and optionally edt_id.
|
|
Check permission (partition must be groups_editable)
|
|
and invalidate caches. Commit session.
|
|
dest_url is used for error messages.
|
|
"""
|
|
if not self.partition.formsemestre.can_change_groups():
|
|
raise AccessDenied("Vous n'avez pas le droit d'effectuer cette opération !")
|
|
if self.group_name is None:
|
|
raise ValueError("can't set a name to default group")
|
|
if not self.partition.groups_editable:
|
|
raise AccessDenied("Partition non éditable")
|
|
if group_name:
|
|
group_name = group_name.strip()
|
|
if not group_name:
|
|
raise ScoValueError("nom de groupe vide !", dest_url=dest_url)
|
|
if group_name != self.group_name and not GroupDescr.check_name(
|
|
self.partition, group_name
|
|
):
|
|
raise ScoValueError(
|
|
"Le nom de groupe existe déjà dans la partition", dest_url=dest_url
|
|
)
|
|
|
|
self.group_name = group_name
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
sco_cache.invalidate_formsemestre(
|
|
formsemestre_id=self.partition.formsemestre_id
|
|
)
|
|
|
|
def set_edt_id(self, edt_id: str):
|
|
"Set edt_id. Check permission. Commit session."
|
|
if not self.partition.formsemestre.can_change_groups():
|
|
raise AccessDenied("Vous n'avez pas le droit d'effectuer cette opération !")
|
|
if isinstance(edt_id, str):
|
|
edt_id = edt_id.strip() or None
|
|
self.edt_id = edt_id
|
|
db.session.add(self)
|
|
db.session.commit()
|
|
|
|
def remove_etud(self, etud: "Identite"):
|
|
"Enlève l'étudiant de ce groupe s'il en fait partie (ne fait rien sinon)"
|
|
if etud in self.etuds:
|
|
self.etuds.remove(etud)
|
|
db.session.commit()
|
|
Scolog.logdb(
|
|
method="group_remove_etud",
|
|
etudid=etud.id,
|
|
msg=f"Retrait du groupe {self.group_name} de {self.partition.partition_name}",
|
|
commit=True,
|
|
)
|
|
# Update parcours
|
|
if self.partition.partition_name == scu.PARTITION_PARCOURS:
|
|
self.partition.formsemestre.update_inscriptions_parcours_from_groups(
|
|
etudid=etud.id
|
|
)
|
|
sco_cache.invalidate_formsemestre(self.partition.formsemestre_id)
|
|
|
|
|
|
group_membership = db.Table(
|
|
"group_membership",
|
|
db.Column("etudid", db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE")),
|
|
db.Column("group_id", db.Integer, db.ForeignKey("group_descr.id")),
|
|
db.UniqueConstraint("etudid", "group_id"),
|
|
)
|
|
# class GroupMembership(db.Model):
|
|
# """Association groupe / étudiant"""
|
|
|
|
# __tablename__ = "group_membership"
|
|
# __table_args__ = (db.UniqueConstraint("etudid", "group_id"),)
|
|
# id = db.Column(db.Integer, primary_key=True)
|
|
# etudid = db.Column(db.Integer, db.ForeignKey("identite.id", ondelete="CASCADE"))
|
|
# group_id = db.Column(db.Integer, db.ForeignKey("group_descr.id"))
|