diff --git a/app/api/formations.py b/app/api/formations.py index 3274cb226..2712a2c28 100644 --- a/app/api/formations.py +++ b/app/api/formations.py @@ -321,6 +321,8 @@ def set_ue_parcours(ue_id: int): ] log(f"set_ue_parcours: ue_id={ue.id} parcours_ids={parcours_ids}") ok, error_message = ue.set_parcours(parcours) + if not ok: + return json_error(404, error_message) return {"status": ok, "message": error_message} diff --git a/app/api/semset.py b/app/api/semset.py index f1239534f..5eba3f3ee 100644 --- a/app/api/semset.py +++ b/app/api/semset.py @@ -7,33 +7,34 @@ """ ScoDoc 9 API : accès aux formsemestres """ -from flask import g, jsonify, request -from flask_login import login_required +# from flask import g, jsonify, request +# from flask_login import login_required -import app -from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR -from app.decorators import scodoc, permission_required -from app.scodoc.sco_utils import json_error -from app.models.formsemestre import NotesSemSet -from app.scodoc.sco_permissions import Permission +# import app +# from app.api import api_bp as bp, api_web_bp, API_CLIENT_ERROR +# from app.decorators import scodoc, permission_required +# from app.scodoc.sco_utils import json_error +# from app.models.formsemestre import NotesSemSet +# from app.scodoc.sco_permissions import Permission -@bp.route("/semset/set_periode/", methods=["POST"]) -@api_web_bp.route("/semset/set_periode/", methods=["POST"]) -@login_required -@scodoc -@permission_required(Permission.ScoEditApo) -# TODO à modifier pour utiliser @as_json -def semset_set_periode(semset_id: int): - "Change la période d'un semset" - query = NotesSemSet.query.filter_by(semset_id=semset_id) - if g.scodoc_dept: - query = query.filter_by(dept_id=g.scodoc_dept_id) - semset: NotesSemSet = query.first_or_404() - data = request.get_json(force=True) # may raise 400 Bad Request - try: - periode = int(data) - semset.set_periode(periode) - except ValueError: - return json_error(API_CLIENT_ERROR, "invalid periode value") - return jsonify({"OK": True}) +# Impossible de changer la période à cause des archives +# @bp.route("/semset/set_periode/", methods=["POST"]) +# @api_web_bp.route("/semset/set_periode/", methods=["POST"]) +# @login_required +# @scodoc +# @permission_required(Permission.ScoEditApo) +# # TODO à modifier pour utiliser @as_json +# def semset_set_periode(semset_id: int): +# "Change la période d'un semset" +# query = NotesSemSet.query.filter_by(semset_id=semset_id) +# if g.scodoc_dept: +# query = query.filter_by(dept_id=g.scodoc_dept_id) +# semset: NotesSemSet = query.first_or_404() +# data = request.get_json(force=True) # may raise 400 Bad Request +# try: +# periode = int(data) +# semset.set_periode(periode) +# except ValueError: +# return json_error(API_CLIENT_ERROR, "invalid periode value") +# return jsonify({"OK": True}) diff --git a/app/but/bulletin_but.py b/app/but/bulletin_but.py index 481393c5e..eb71373f1 100644 --- a/app/but/bulletin_but.py +++ b/app/but/bulletin_but.py @@ -484,6 +484,7 @@ class BulletinBUT: d["etudid"] = etud.id d["etud"] = d["etudiant"] d["etud"]["nomprenom"] = etud.nomprenom + d["etud"]["etat_civil"] = etud.etat_civil d.update(self.res.sem) etud_etat = self.res.get_etud_etat(etud.id) d["filigranne"] = sco_bulletins_pdf.get_filigranne( diff --git a/app/comp/res_compat.py b/app/comp/res_compat.py index 05f52905c..cbebb8356 100644 --- a/app/comp/res_compat.py +++ b/app/comp/res_compat.py @@ -19,6 +19,7 @@ from app.models import Identite, FormSemestre, ModuleImpl, ScolarAutorisationIns from app.scodoc.codes_cursus import UE_SPORT, DEF from app.scodoc import sco_utils as scu + # Pour raccorder le code des anciens codes qui attendent une NoteTable class NotesTableCompat(ResultatsSemestre): """Implementation partielle de NotesTable @@ -266,16 +267,21 @@ class NotesTableCompat(ResultatsSemestre): ue_status_list.append(ue_status) return self.parcours.check_barre_ues(ue_status_list) - def all_etuds_have_sem_decisions(self): - """True si tous les étudiants du semestre ont une décision de jury. - Ne regarde pas les décisions d'UE. + def etudids_without_decisions(self) -> list[int]: + """Liste des id d'étudiants du semestre non démissionnaires + n'ayant pas de décision de jury. + - En classic: ne regarde pas que la décision de semestre (pas les décisions d'UE). + - en BUT: utilise etud_has_decision """ - for ins in self.formsemestre.inscriptions: - if ins.etat != scu.INSCRIT: - continue # skip démissionnaires - if self.get_etud_decision_sem(ins.etudid) is None: - return False - return True + check_func = ( + self.etud_has_decision if self.is_apc else self.get_etud_decision_sem + ) + etudids = [ + ins.etudid + for ins in self.formsemestre.inscriptions + if (ins.etat == scu.INSCRIT) and (not check_func(ins.etudid)) + ] + return etudids def etud_has_decision(self, etudid): """True s'il y a une décision de jury pour cet étudiant émanant de ce formsemestre. @@ -316,7 +322,8 @@ class NotesTableCompat(ResultatsSemestre): def get_etud_decision_sem(self, etudid: int) -> dict: """Decision du jury semestre prise pour cet etudiant, ou None s'il n'y en pas eu. { 'code' : None|ATT|..., 'assidu' : 0|1, 'event_date' : , compense_formsemestre_id } - Si état défaillant, force le code a DEF + Si état défaillant, force le code a DEF. + Toujours None en BUT. """ if self.get_etud_etat(etudid) == DEF: return { diff --git a/app/models/etudiants.py b/app/models/etudiants.py index 09534f6dd..723ff53f5 100644 --- a/app/models/etudiants.py +++ b/app/models/etudiants.py @@ -30,6 +30,7 @@ class Identite(db.Model): db.UniqueConstraint("dept_id", "code_nip"), db.UniqueConstraint("dept_id", "code_ine"), db.CheckConstraint("civilite IN ('M', 'F', 'X')"), + db.CheckConstraint("civilite_etat_civil IN ('M', 'F', 'X')"), ) id = db.Column(db.Integer, primary_key=True) @@ -41,6 +42,12 @@ class Identite(db.Model): nom_usuel = db.Column(db.Text()) "optionnel (si present, affiché à la place du nom)" civilite = db.Column(db.String(1), nullable=False) + + # données d'état-civil. Si présent remplace les données d'usage dans les documents officiels (bulletins, PV) + # cf nomprenom_etat_civil() + civilite_etat_civil = db.Column(db.String(1), nullable=False, server_default="X") + prenom_etat_civil = db.Column(db.Text(), nullable=False, server_default="") + date_naissance = db.Column(db.Date) lieu_naissance = db.Column(db.Text()) dept_naissance = db.Column(db.Text()) @@ -108,6 +115,13 @@ class Identite(db.Model): """ return {"M": "M.", "F": "Mme", "X": ""}[self.civilite] + @property + def civilite_etat_civil_str(self): + """returns 'M.' ou 'Mme' ou '' (pour le genre neutre, + personnes ne souhaitant pas d'affichage). + """ + return {"M": "M.", "F": "Mme", "X": ""}[self.civilite_etat_civil] + def sex_nom(self, no_accents=False) -> str: "'M. DUPONTÉ', ou si no_accents, 'M. DUPONTE'" s = f"{self.civilite_str} {(self.nom_usuel or self.nom).upper()}" @@ -154,6 +168,14 @@ class Identite(db.Model): r.append("-".join([x.lower().capitalize() for x in fields])) return " ".join(r) + @property + def etat_civil(self): + if self.prenom_etat_civil: + civ = {"M": "M.", "F": "Mme", "X": ""}[self.civilite_etat_civil] + return f"{civ} {self.prenom_etat_civil} {self.nom}" + else: + return self.nomprenom + @property def nom_short(self): "Nom et début du prénom pour table recap: 'DUPONT Pi.'" @@ -195,6 +217,8 @@ class Identite(db.Model): "nom_usuel": self.nom_usuel, "prenom": self.prenom, "sort_key": self.sort_key, + "civilite_etat_civil": self.civilite_etat_civil, + "prenom_etat_civil": self.prenom_etat_civil, } def to_dict_scodoc7(self) -> dict: @@ -238,6 +262,8 @@ class Identite(db.Model): "dept_naissance": self.dept_naissance or "", "nationalite": self.nationalite or "", "boursier": self.boursier or "", + "civilite_etat_civil": self.civilite_etat_civil, + "prenom_etat_civil": self.prenom_etat_civil, } if include_urls and has_request_context(): # test request context so we can use this func in tests under the flask shell @@ -454,10 +480,10 @@ class Identite(db.Model): M. Pierre Dupont """ if with_paragraph: - return f"""{self.nomprenom}{line_sep}n°{self.code_nip or ""}{line_sep}né{self.e} le { + return f"""{self.etat_civil}{line_sep}n°{self.code_nip or ""}{line_sep}né{self.e} le { self.date_naissance.strftime("%d/%m/%Y") if self.date_naissance else ""}{ line_sep}à {self.lieu_naissance or ""}""" - return self.nomprenom + return self.etat_civil def photo_html(self, title=None, size="small") -> str: """HTML img tag for the photo, either in small size (h90) diff --git a/app/models/formsemestre.py b/app/models/formsemestre.py index 6e00649d3..b061b248a 100644 --- a/app/models/formsemestre.py +++ b/app/models/formsemestre.py @@ -198,11 +198,14 @@ class FormSemestre(db.Model): d["date_fin"] = d["date_fin_iso"] = "" d["responsables"] = [u.id for u in self.responsables] d["titre_formation"] = self.titre_formation() - if convert_objects: + if convert_objects: # pour API d["parcours"] = [p.to_dict() for p in self.get_parcours_apc()] d["departement"] = self.departement.to_dict() d["formation"] = self.formation.to_dict() d["etape_apo"] = self.etapes_apo_str() + else: + # Converti les étapes Apogee sous forme d'ApoEtapeVDI (compat scodoc7) + d["etapes"] = [e.as_apovdi() for e in self.etapes] return d def to_dict_api(self): @@ -923,7 +926,7 @@ class FormSemestreEtape(db.Model): def __repr__(self): return f"" - def as_apovdi(self): + def as_apovdi(self) -> ApoEtapeVDI: return ApoEtapeVDI(self.etape_apo) diff --git a/app/models/ues.py b/app/models/ues.py index e301530bd..96074bdb9 100644 --- a/app/models/ues.py +++ b/app/models/ues.py @@ -325,7 +325,8 @@ class UniteEns(db.Model): if self.niveau_competence_id is not None: return ( False, - f"{self.acronyme} déjà associée à un niveau de compétences", + f"""{self.acronyme} déjà associée à un niveau de compétences ({ + self.id}, {self.niveau_competence_id})""", ) if ( niveau.competence.referentiel.id @@ -358,6 +359,7 @@ class UniteEns(db.Model): Si un niveau est déjà associé, vérifie sa cohérence. Renvoie (True, "") si ok, sinon (False, error_message) """ + msg = "" # Le niveau est-il dans tous ces parcours ? Sinon, l'enlève prev_niveau = self.niveau_competence if ( @@ -366,6 +368,7 @@ class UniteEns(db.Model): and self.niveau_competence.id not in self._parcours_niveaux_ids(parcours) ): self.niveau_competence = None + msg = " (niveau compétence désassocié !)" if parcours and self.niveau_competence: ok, error_message = self.check_niveau_unique_dans_parcours( @@ -381,12 +384,12 @@ class UniteEns(db.Model): # Invalidation du cache self.formation.invalidate_cached_sems() log(f"ue.set_parcours( {self}, {parcours} )") - return True, "" + return True, "parcours enregistrés" + msg def add_parcour(self, parcour: ApcParcours) -> tuple[bool, str]: """Ajoute ce parcours à ceux de l'UE""" if parcour.id in {p.id for p in self.parcours}: - return True, "" + return True, "" # déjà présent if parcour.referentiel.id != self.formation.referentiel_competence.id: return False, "Le parcours n'appartient pas au référentiel de la formation" diff --git a/app/scodoc/sco_apogee_compare.py b/app/scodoc/sco_apogee_compare.py index 0c93c6652..3b82862db 100644 --- a/app/scodoc/sco_apogee_compare.py +++ b/app/scodoc/sco_apogee_compare.py @@ -46,13 +46,14 @@ Pour chaque étudiant commun: from flask import g, url_for from app import log -from app.scodoc import sco_apogee_csv +from app.scodoc import sco_apogee_csv, sco_apogee_reader +from app.scodoc.sco_apogee_csv import ApoData from app.scodoc.gen_tables import GenTable from app.scodoc.sco_exceptions import ScoValueError from app.scodoc import html_sco_header from app.scodoc import sco_preferences -_help_txt = """ +_HELP_TXT = """

Outil de comparaison de fichiers (maquettes CSV) Apogée.

@@ -69,7 +70,7 @@ def apo_compare_csv_form(): """

Comparaison de fichiers Apogée

""", - _help_txt, + _HELP_TXT, """
Fichier Apogée A: @@ -109,14 +110,14 @@ def apo_compare_csv(file_a, file_b, autodetect=True): raise ScoValueError( f""" Erreur: l'encodage de l'un des fichiers est incorrect. - Vérifiez qu'il est bien en {sco_apogee_csv.APO_INPUT_ENCODING} + Vérifiez qu'il est bien en {sco_apogee_reader.APO_INPUT_ENCODING} """, dest_url=dest_url, ) from exc H = [ html_sco_header.sco_header(page_title="Comparaison de fichiers Apogée"), "

Comparaison de fichiers Apogée

", - _help_txt, + _HELP_TXT, '
', _apo_compare_csv(apo_data_a, apo_data_b), "
", @@ -130,17 +131,17 @@ def _load_apo_data(csvfile, autodetect=True): "Read data from request variable and build ApoData" data_b = csvfile.read() if autodetect: - data_b, message = sco_apogee_csv.fix_data_encoding(data_b) + data_b, message = sco_apogee_reader.fix_data_encoding(data_b) if message: log(f"apo_compare_csv: {message}") if not data_b: raise ScoValueError("fichier vide ? (apo_compare_csv: no data)") - data = data_b.decode(sco_apogee_csv.APO_INPUT_ENCODING) + data = data_b.decode(sco_apogee_reader.APO_INPUT_ENCODING) apo_data = sco_apogee_csv.ApoData(data, orig_filename=csvfile.filename) return apo_data -def _apo_compare_csv(A, B): +def _apo_compare_csv(apo_a: ApoData, apo_b: ApoData): """Generate html report comparing A and B, two instances of ApoData representing Apogee CSV maquettes. """ @@ -148,74 +149,75 @@ def _apo_compare_csv(A, B): # 1-- Check etape and codes L.append('
En-tête
') L.append('
Nom fichier A:') - L.append(A.orig_filename) + L.append(apo_a.orig_filename) L.append("
") L.append('
Nom fichier B:') - L.append(B.orig_filename) + L.append(apo_b.orig_filename) L.append("
") L.append('
Étape Apogée:') - if A.etape_apogee != B.etape_apogee: + if apo_a.etape_apogee != apo_b.etape_apogee: L.append( - '%s != %s' % (A.etape_apogee, B.etape_apogee) + f"""{apo_a.etape_apogee} != {apo_b.etape_apogee}""" ) else: - L.append('%s' % (A.etape_apogee,)) + L.append(f"""{apo_a.etape_apogee}""") L.append("
") L.append('
VDI Apogée:') - if A.vdi_apogee != B.vdi_apogee: - L.append('%s != %s' % (A.vdi_apogee, B.vdi_apogee)) + if apo_a.vdi_apogee != apo_b.vdi_apogee: + L.append( + f"""{apo_a.vdi_apogee} != {apo_b.vdi_apogee}""" + ) else: - L.append('%s' % (A.vdi_apogee,)) + L.append(f"""{apo_a.vdi_apogee}""") L.append("
") L.append('
Code diplôme :') - if A.cod_dip_apogee != B.cod_dip_apogee: + if apo_a.cod_dip_apogee != apo_b.cod_dip_apogee: L.append( - '%s != %s' - % (A.cod_dip_apogee, B.cod_dip_apogee) + f"""{apo_a.cod_dip_apogee} != {apo_b.cod_dip_apogee}""" ) else: - L.append('%s' % (A.cod_dip_apogee,)) + L.append(f"""{apo_a.cod_dip_apogee}""") L.append("
") L.append('
Année scolaire :') - if A.annee_scolaire != B.annee_scolaire: + if apo_a.annee_scolaire != apo_b.annee_scolaire: L.append( '%s != %s' - % (A.annee_scolaire, B.annee_scolaire) + % (apo_a.annee_scolaire, apo_b.annee_scolaire) ) else: - L.append('%s' % (A.annee_scolaire,)) + L.append('%s' % (apo_a.annee_scolaire,)) L.append("
") # Colonnes: - A_elts = set(A.apo_elts.keys()) - B_elts = set(B.apo_elts.keys()) + a_elts = set(apo_a.apo_csv.apo_elts.keys()) + b_elts = set(apo_b.apo_csv.apo_elts.keys()) L.append('
Éléments Apogée :') - if A_elts == B_elts: - L.append('%d' % len(A_elts)) + if a_elts == b_elts: + L.append(f"""{len(a_elts)}""") else: - elts_communs = A_elts.intersection(B_elts) - elts_only_A = A_elts - A_elts.intersection(B_elts) - elts_only_B = B_elts - A_elts.intersection(B_elts) + elts_communs = a_elts.intersection(b_elts) + elts_only_a = a_elts - a_elts.intersection(b_elts) + elts_only_b = b_elts - a_elts.intersection(b_elts) L.append( 'différents (%d en commun, %d seulement dans A, %d seulement dans B)' % ( len(elts_communs), - len(elts_only_A), - len(elts_only_B), + len(elts_only_a), + len(elts_only_b), ) ) - if elts_only_A: + if elts_only_a: L.append( '
Éléments seulement dans A : %s
' - % ", ".join(sorted(elts_only_A)) + % ", ".join(sorted(elts_only_a)) ) - if elts_only_B: + if elts_only_b: L.append( '
Éléments seulement dans B : %s
' - % ", ".join(sorted(elts_only_B)) + % ", ".join(sorted(elts_only_b)) ) L.append("
") L.append("
") # /section @@ -223,22 +225,21 @@ def _apo_compare_csv(A, B): # 2-- L.append('
Étudiants
') - A_nips = set(A.etud_by_nip) - B_nips = set(B.etud_by_nip) - nb_etuds_communs = len(A_nips.intersection(B_nips)) - nb_etuds_dif = len(A_nips.union(B_nips) - A_nips.intersection(B_nips)) + a_nips = set(apo_a.etud_by_nip) + b_nips = set(apo_b.etud_by_nip) + nb_etuds_communs = len(a_nips.intersection(b_nips)) + nb_etuds_dif = len(a_nips.union(b_nips) - a_nips.intersection(b_nips)) L.append("""
Liste d'étudiants :""") - if A_nips == B_nips: + if a_nips == b_nips: L.append( - """ - %d étudiants (tous présents dans chaque fichier) + f""" + {len(a_nips)} étudiants (tous présents dans chaque fichier) """ - % len(A_nips) ) else: L.append( - 'différents (%d en commun, %d différents)' - % (nb_etuds_communs, nb_etuds_dif) + f"""différents ({nb_etuds_communs} en commun, { + nb_etuds_dif} différents)""" ) L.append("
") L.append("
") # /section @@ -247,19 +248,22 @@ def _apo_compare_csv(A, B): if nb_etuds_communs > 0: L.append( """
-
Différences de résultats des étudiants présents dans les deux fichiers
+
Différences de résultats des étudiants présents dans les deux fichiers +

""" ) - T = apo_table_compare_etud_results(A, B) + T = apo_table_compare_etud_results(apo_a, apo_b) if T.get_nb_rows() > 0: L.append(T.html()) else: L.append( - """

aucune différence de résultats - sur les %d étudiants communs (les éléments Apogée n'apparaissant pas dans les deux fichiers sont omis)

+ f"""

aucune différence de résultats + sur les {nb_etuds_communs} étudiants communs + (les éléments Apogée n'apparaissant pas dans les deux + fichiers sont omis) +

""" - % nb_etuds_communs ) L.append("
") # /section @@ -290,19 +294,17 @@ def apo_table_compare_etud_results(A, B): def _build_etud_res(e, apo_data): r = {} - for elt_code in apo_data.apo_elts: - elt = apo_data.apo_elts[elt_code] + for elt_code in apo_data.apo_csv.apo_elts: + elt = apo_data.apo_csv.apo_elts[elt_code] try: # les colonnes de cet élément - col_ids_type = [ - (ec["apoL_a01_code"], ec["Type R\xc3\xa9s."]) for ec in elt.cols - ] + col_ids_type = [(ec["apoL_a01_code"], ec["Type Rés."]) for ec in elt.cols] except KeyError as exc: raise ScoValueError( - "Erreur: un élément sans 'Type R\xc3\xa9s.'. Vérifiez l'encodage de vos fichiers." + "Erreur: un élément sans 'Type Rés.'. Vérifiez l'encodage de vos fichiers." ) from exc r[elt_code] = {} - for (col_id, type_res) in col_ids_type: + for col_id, type_res in col_ids_type: r[elt_code][type_res] = e.cols[col_id] return r diff --git a/app/scodoc/sco_apogee_csv.py b/app/scodoc/sco_apogee_csv.py index 928d4b4af..2042c4db9 100644 --- a/app/scodoc/sco_apogee_csv.py +++ b/app/scodoc/sco_apogee_csv.py @@ -1,6 +1,3 @@ -# -*- mode: python -*- -# -*- coding: utf-8 -*- - ############################################################################## # # Gestion scolarite IUT @@ -30,57 +27,12 @@ Ce code a été au départ inspiré par les travaux de Damien Mascré, scodoc2apogee (en Java). A utiliser en fin de semestre, après les jury. - On communique avec Apogée via des fichiers CSV. -Le fichier CSV, champs séparés par des tabulations, a la structure suivante: - -
- XX-APO_TITRES-XX
- apoC_annee	2007/2008
- apoC_cod_dip	VDTCJ
- apoC_Cod_Exp	1
- apoC_cod_vdi	111
- apoC_Fichier_Exp	VDTCJ_V1CJ.txt
- apoC_lib_dip	DUT CJ
- apoC_Titre1	Export Apogée du 13/06/2008 à 14:29
- apoC_Titre2
-
- XX-APO_COLONNES-XX
- apoL_a01_code	Type Objet	Code	Version	Année	Session	Admission/Admissibilité	Type Rés.			Etudiant	Numéro
- apoL_a02_nom										1	Nom
- apoL_a03_prenom										1	Prénom
- apoL_a04_naissance									Session	Admissibilité	Naissance
- APO_COL_VAL_DEB
- apoL_c0001	VET	V1CJ	111	2007	0	1	N	V1CJ - DUT CJ an1	0	1	Note
- apoL_c0002	VET	V1CJ	111	2007	0	1	B		0	1	Barème
- apoL_c0003	VET	V1CJ	111	2007	0	1	R		0	1	Résultat
- APO_COL_VAL_FIN
- apoL_c0030	APO_COL_VAL_FIN
-
- XX-APO_VALEURS-XX
- apoL_a01_code	apoL_a02_nom	apoL_a03_prenom	apoL_a04_naissance	apoL_c0001	apoL_c0002	apoL_c0003	apoL_c0004	apoL_c0005	apoL_c0006	apoL_c0007	apoL_c0008	apoL_c0009	apoL_c0010	apoL_c0011	apoL_c0012	apoL_c0013	apoL_c0014	apoL_c0015	apoL_c0016	apoL_c0017	apoL_c0018	apoL_c0019	apoL_c0020	apoL_c0021	apoL_c0022	apoL_c0023	apoL_c0024	apoL_c0025	apoL_c0026	apoL_c0027	apoL_c0028	apoL_c0029
- 10601232	AARIF	MALIKA	 22/09/1986	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM	18	20	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM
- 
- - - On récupère nos éléments pédagogiques dans la section XX-APO-COLONNES-XX et - notre liste d'étudiants dans la section XX-APO_VALEURS-XX. Les champs de la - section XX-APO_VALEURS-XX sont décrits par les lignes successives de la - section XX-APO_COLONNES-XX. - - Le fichier CSV correspond à une étape, qui est récupérée sur la ligne -
- apoL_c0001	VET	V1CJ ...
- 
- - -XXX A vérifier: - AJAC car 1 sem. validé et pas de NAR +XXX A vérifier: AJAC car 1 sem. validé et pas de NAR """ -import collections import datetime from functools import reduce import functools @@ -94,8 +46,6 @@ from zipfile import ZipFile from flask import send_file import numpy as np -# Pour la détection auto de l'encodage des fichiers Apogée: -from chardet import detect as chardet_detect from app import log from app.comp import res_sem @@ -103,6 +53,11 @@ from app.comp.res_compat import NotesTableCompat from app.comp.res_but import ResultatsSemestreBUT from app.models import FormSemestre, Identite, ApcValidationAnnee from app.models.config import ScoDocSiteConfig +from app.scodoc.sco_apogee_reader import ( + APO_DECIMAL_SEP, + ApoCSVReadWrite, + ApoEtudTuple, +) import app.scodoc.sco_utils as scu from app.scodoc.sco_exceptions import ScoValueError, ScoFormatError from app.scodoc.gen_tables import GenTable @@ -118,15 +73,6 @@ from app.scodoc import sco_cursus from app.scodoc import sco_formsemestre from app.scodoc import sco_etud -APO_PORTAL_ENCODING = ( - "utf8" # encodage du fichier CSV Apogée (était 'ISO-8859-1' avant jul. 2016) -) -APO_INPUT_ENCODING = "ISO-8859-1" # -APO_OUTPUT_ENCODING = APO_INPUT_ENCODING # encodage des fichiers Apogee générés -APO_DECIMAL_SEP = "," # separateur décimal: virgule -APO_SEP = "\t" -APO_NEWLINE = "\r\n" - def _apo_fmt_note(note, fmt="%3.2f"): "Formatte une note pour Apogée (séparateur décimal: ',')" @@ -141,89 +87,6 @@ def _apo_fmt_note(note, fmt="%3.2f"): return (fmt % val).replace(".", APO_DECIMAL_SEP) -def guess_data_encoding(text, threshold=0.6): - """Guess string encoding, using chardet heuristics. - Returns encoding, or None if detection failed (confidence below threshold) - """ - r = chardet_detect(text) - if r["confidence"] < threshold: - return None - else: - return r["encoding"] - - -def fix_data_encoding( - text: bytes, - default_source_encoding=APO_INPUT_ENCODING, - dest_encoding=APO_INPUT_ENCODING, -) -> tuple[bytes, str]: - """Try to ensure that text is using dest_encoding - returns converted text, and a message describing the conversion. - - Raises UnicodeEncodeError en cas de problème, en général liée à - une auto-détection errornée. - """ - message = "" - detected_encoding = guess_data_encoding(text) - if not detected_encoding: - if default_source_encoding != dest_encoding: - message = f"converting from {default_source_encoding} to {dest_encoding}" - text = text.decode(default_source_encoding).encode(dest_encoding) - else: - if detected_encoding != dest_encoding: - message = ( - f"converting from detected {default_source_encoding} to {dest_encoding}" - ) - text = text.decode(detected_encoding).encode(dest_encoding) - return text, message - - -class StringIOFileLineWrapper: - def __init__(self, data: str): - self.f = io.StringIO(data) - self.lineno = 0 - - def close(self): - return self.f.close() - - def readline(self): - self.lineno += 1 - return self.f.readline() - - -class DictCol(dict): - "A dict, where we can add attributes" - pass - - -class ApoElt(object): - """Definition d'un Element Apogee - sur plusieurs colonnes du fichier CSV - """ - - def __init__(self, cols): - assert len(cols) > 0 - assert len(set([c["Code"] for c in cols])) == 1 # colonnes de meme code - assert len(set([c["Type Objet"] for c in cols])) == 1 # colonnes de meme type - self.cols = cols - self.code = cols[0]["Code"] - self.version = cols[0]["Version"] - self.type_objet = cols[0]["Type Objet"] - - def append(self, col): - assert col["Code"] == self.code - if col["Type Objet"] != self.type_objet: - log( - "Warning: ApoElt: duplicate id %s (%s and %s)" - % (self.code, self.type_objet, col["Type Objet"]) - ) - self.type_objet = col["Type Objet"] - self.cols.append(col) - - def __repr__(self): - return f"ApoElt(code='{self.code}', cols={pprint.pformat(self.cols)})" - - class EtuCol: """Valeurs colonnes d'un element pour un etudiant""" @@ -243,11 +106,7 @@ class ApoEtud(dict): def __init__( self, - nip="", - nom="", - prenom="", - naissance="", - cols={}, + apo_etud_tuple: ApoEtudTuple, export_res_etape=True, export_res_sem=True, export_res_ues=True, @@ -255,21 +114,20 @@ class ApoEtud(dict): export_res_sdj=True, export_res_rat=True, ): - self["nip"] = nip - self["nom"] = nom - self["prenom"] = prenom - self["naissance"] = naissance - self.cols = cols + self["nip"] = apo_etud_tuple.nip + self["nom"] = apo_etud_tuple.nom + self["prenom"] = apo_etud_tuple.prenom + self["naissance"] = apo_etud_tuple.naissance + self.cols = apo_etud_tuple.cols "{ col_id : value } colid = 'apoL_c0001'" self.is_apc = None "Vrai si BUT" self.col_elts = {} "{'V1RT': {'R': 'ADM', 'J': '', 'B': 20, 'N': '12.14'}}" - self.new_cols = {} # { col_id : value to record in csv } self.etud: Identite = None "etudiant ScoDoc associé" self.etat = None # ETUD_OK, ... - self.is_NAR = False + self.is_nar = False "True si NARé dans un semestre" self.log = [] self.has_logged_no_decision = False @@ -283,6 +141,17 @@ class ApoEtud(dict): self.fmt_note = functools.partial( _apo_fmt_note, fmt=ScoDocSiteConfig.get_code_apo("NOTES_FMT") or "3.2f" ) + # Initialisés par associate_sco: + self.autre_sem: dict = None + self.autre_res: NotesTableCompat = None + self.cur_sem: dict = None + self.cur_res: NotesTableCompat = None + self.new_cols = {} + "{ col_id : value to record in csv }" + + # Pour le BUT: + self.validation_annee_but: ApcValidationAnnee = None + "validation de jury annuelle BUT, ou None" def __repr__(self): return f"""ApoEtud( nom='{self["nom"]}', nip='{self["nip"]}' )""" @@ -334,25 +203,24 @@ class ApoEtud(dict): ) # etudiant inconnu, recopie les valeurs existantes dans Apo else: sco_elts = {} # valeurs trouvées dans ScoDoc code : { N, B, J, R } - for col_id in apo_data.col_ids[4:]: - code = apo_data.cols[col_id]["Code"] # 'V1RT' - el = sco_elts.get( - code, None - ) # {'R': ADM, 'J': '', 'B': 20, 'N': '12.14'} - if el is None: # pas déjà trouvé - cur_sem, autre_sem = self.etud_semestres_de_etape(apo_data) + for col_id in apo_data.apo_csv.col_ids[4:]: + code = apo_data.apo_csv.cols[col_id]["Code"] # 'V1RT' + elt = sco_elts.get(code, None) + # elt est {'R': ADM, 'J': '', 'B': 20, 'N': '12.14'} + if elt is None: # pas déjà trouvé + self.etud_set_semestres_de_etape(apo_data) for sem in apo_data.sems_etape: - el = self.search_elt_in_sem(code, sem, cur_sem, autre_sem) - if el is not None: - sco_elts[code] = el + elt = self.search_elt_in_sem(code, sem) + if elt is not None: + sco_elts[code] = elt break - self.col_elts[code] = el - if el is None: + self.col_elts[code] = elt + if elt is None: self.new_cols[col_id] = self.cols[col_id] else: try: self.new_cols[col_id] = sco_elts[code][ - apo_data.cols[col_id]["Type Rés."] + apo_data.apo_csv.cols[col_id]["Type Rés."] ] except KeyError as exc: log( @@ -365,15 +233,15 @@ class ApoEtud(dict): (vérifier qu'il est bien associé à une UE ou semestre)?""" ) from exc # recopie les 4 premieres colonnes (nom, ..., naissance): - for col_id in apo_data.col_ids[:4]: + for col_id in apo_data.apo_csv.col_ids[:4]: self.new_cols[col_id] = self.cols[col_id] # def unassociated_codes(self, apo_data): # "list of apo elements for this student without a value in ScoDoc" - # codes = set([apo_data.cols[col_id].code for col_id in apo_data.col_ids]) + # codes = set([apo_data.apo_csv.cols[col_id].code for col_id in apo_data.apo_csv.col_ids]) # return codes - set(sco_elts) - def search_elt_in_sem(self, code, sem, cur_sem, autre_sem) -> dict: + def search_elt_in_sem(self, code, sem) -> dict: """ VET code jury etape (en BUT, le code annuel) ELP élément pédagogique: UE, module @@ -387,20 +255,29 @@ class ApoEtud(dict): code (str): code apo de l'element cherché sem (dict): semestre dans lequel on cherche l'élément cur_sem (dict): semestre "courant" pour résultats annuels (VET) - autre_sem (dict): autre semestre utilisé pour calculé les résultats annuels (VET) + autre_sem (dict): autre semestre utilisé pour calculer les résultats annuels (VET) Returns: dict: with N, B, J, R keys, ou None si elt non trouvé """ etudid = self.etud["etudid"] - formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"]) - nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) + if not self.cur_res: + log("search_elt_in_sem: no cur_res !") + return None + if sem["formsemestre_id"] == self.cur_res.formsemestre.id: + res = self.cur_res + elif ( + self.autre_res and sem["formsemestre_id"] == self.autre_res.formsemestre.id + ): + res = self.autre_res + else: + formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"]) + res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) - if etudid not in nt.identdict: + if etudid not in res.identdict: return None # etudiant non inscrit dans ce semestre - decision = nt.get_etud_decision_sem(etudid) - if not self.export_res_sdj and not decision: + if not self.export_res_sdj and not res.etud_has_decision(etudid): # pas de decision de jury, on n'enregistre rien # (meme si démissionnaire) if not self.has_logged_no_decision: @@ -408,43 +285,46 @@ class ApoEtud(dict): self.has_logged_no_decision = True return VOID_APO_RES - if decision and decision["code"] == NAR: - self.is_NAR = True + if res.is_apc: # export BUT + self._but_load_validation_annuelle() + else: + decision = res.get_etud_decision_sem(etudid) + if decision and decision["code"] == NAR: + self.is_nar = True + # Element semestre: (non BUT donc) + if code in {x.strip() for x in sem["elt_sem_apo"].split(",")}: + if self.export_res_sem: + return self.comp_elt_semestre(res, decision, etudid) + else: + return VOID_APO_RES # Element etape (annuel ou non): if sco_formsemestre.sem_has_etape(sem, code) or ( code in {x.strip() for x in sem["elt_annee_apo"].split(",")} ): export_res_etape = self.export_res_etape - if (not export_res_etape) and cur_sem: + if (not export_res_etape) and self.cur_sem: # exporte toujours le résultat de l'étape si l'étudiant est diplômé Se = sco_cursus.get_situation_etud_cursus( - self.etud, cur_sem["formsemestre_id"] + self.etud, self.cur_sem["formsemestre_id"] ) export_res_etape = Se.all_other_validated() if export_res_etape: - return self.comp_elt_annuel(etudid, cur_sem, autre_sem) + return self.comp_elt_annuel(etudid) else: self.log.append("export étape désactivé") return VOID_APO_RES - # Element semestre: - if code in {x.strip() for x in sem["elt_sem_apo"].split(",")}: - if self.export_res_sem: - return self.comp_elt_semestre(nt, decision, etudid) - else: - return VOID_APO_RES - # Elements UE - decisions_ue = nt.get_etud_decisions_ue(etudid) - for ue in nt.get_ues_stat_dict(): + decisions_ue = res.get_etud_decisions_ue(etudid) + for ue in res.get_ues_stat_dict(): if ue["code_apogee"] and code in { x.strip() for x in ue["code_apogee"].split(",") }: if self.export_res_ues: if decisions_ue and ue["ue_id"] in decisions_ue: - ue_status = nt.get_etud_ue_status(etudid, ue["ue_id"]) + ue_status = res.get_etud_ue_status(etudid, ue["ue_id"]) code_decision_ue = decisions_ue[ue["ue_id"]]["code"] return dict( N=self.fmt_note(ue_status["moy"] if ue_status else ""), @@ -459,14 +339,14 @@ class ApoEtud(dict): return VOID_APO_RES # Elements Modules - modimpls = nt.get_modimpls_dict() + modimpls = res.get_modimpls_dict() module_code_found = False for modimpl in modimpls: module = modimpl["module"] if module["code_apogee"] and code in { x.strip() for x in module["code_apogee"].split(",") }: - n = nt.get_etud_mod_moy(modimpl["moduleimpl_id"], etudid) + n = res.get_etud_mod_moy(modimpl["moduleimpl_id"], etudid) if n != "NI" and self.export_res_modules: return dict(N=self.fmt_note(n), B=20, J="", R="") else: @@ -476,10 +356,11 @@ class ApoEtud(dict): # return None # element Apogee non trouvé dans ce semestre - def comp_elt_semestre(self, nt, decision, etudid): - """Calcul résultat apo semestre""" - if self.is_apc: - # pas de code semestre en APC ! + def comp_elt_semestre(self, nt: NotesTableCompat, decision: dict, etudid: int): + """Calcul résultat apo semestre. + Toujours vide pour en BUT/APC. + """ + if self.is_apc: # garde fou: pas de code semestre en APC ! return dict(N="", B=20, J="", R="", M="") if decision is None: etud = Identite.get_etud(etudid) @@ -496,7 +377,7 @@ class ApoEtud(dict): note_str = self.fmt_note(note) return dict(N=note_str, B=20, J="", R=decision_apo, M="") - def comp_elt_annuel(self, etudid, cur_sem, autre_sem): + def comp_elt_annuel(self, etudid): """Calcul resultat annuel (VET) à partir du semestre courant et de l'autre (le suivant ou le précédent complétant l'année scolaire) En BUT, c'est la décision de jury annuelle (ApcValidationAnnee). @@ -514,16 +395,16 @@ class ApoEtud(dict): # XXX cette règle est discutable, à valider # log('comp_elt_annuel cur_sem=%s autre_sem=%s' % (cur_sem['formsemestre_id'], autre_sem['formsemestre_id'])) - if not cur_sem: + if not self.cur_sem: # l'étudiant n'a pas de semestre courant ?! self.log.append("pas de semestre courant") log(f"comp_elt_annuel: etudid {etudid} has no cur_sem") return VOID_APO_RES - cur_formsemestre = FormSemestre.query.get_or_404(cur_sem["formsemestre_id"]) - cur_nt: NotesTableCompat = res_sem.load_formsemestre_results(cur_formsemestre) - if not self.is_apc: - cur_decision = cur_nt.get_etud_decision_sem(etudid) + if self.is_apc: + cur_decision = {} # comp_elt_semestre sera vide. + else: + cur_decision = self.cur_res.get_etud_decision_sem(etudid) if not cur_decision: # pas de decision => pas de résultat annuel return VOID_APO_RES @@ -532,21 +413,17 @@ class ApoEtud(dict): # ne touche pas aux RATs return VOID_APO_RES - if not autre_sem: + if not self.autre_sem: # formations monosemestre, ou code VET semestriel, # ou jury intermediaire et etudiant non redoublant... - return self.comp_elt_semestre(cur_nt, cur_decision, etudid) + return self.comp_elt_semestre(self.cur_res, cur_decision, etudid) - autre_formsemestre = FormSemestre.query.get_or_404(autre_sem["formsemestre_id"]) - autre_nt: NotesTableCompat = res_sem.load_formsemestre_results( - autre_formsemestre - ) # --- Traite le BUT à part: if self.is_apc: - return self.comp_elt_annuel_apc(cur_nt, autre_nt, etudid) + return self.comp_elt_annuel_apc() # --- Formations classiques decision_apo = ScoDocSiteConfig.get_code_apo(cur_decision["code"]) - autre_decision = autre_nt.get_etud_decision_sem(etudid) + autre_decision = self.autre_res.get_etud_decision_sem(etudid) if not autre_decision: # pas de decision dans l'autre => pas de résultat annuel return VOID_APO_RES @@ -562,8 +439,8 @@ class ApoEtud(dict): ): note_str = "0,01" # note non nulle pour les démissionnaires else: - note = cur_nt.get_etud_moy_gen(etudid) - autre_note = autre_nt.get_etud_moy_gen(etudid) + note = self.cur_res.get_etud_moy_gen(etudid) + autre_note = self.autre_res.get_etud_moy_gen(etudid) # print 'note=%s autre_note=%s' % (note, autre_note) try: moy_annuelle = (note + autre_note) / 2 @@ -578,40 +455,46 @@ class ApoEtud(dict): return dict(N=note_str, B=20, J="", R=decision_apo_annuelle, M="") - def comp_elt_annuel_apc( - self, - cur_res: ResultatsSemestreBUT, - autre_res: ResultatsSemestreBUT, - etudid: int, - ): + def comp_elt_annuel_apc(self): """L'élément Apo pour un résultat annuel BUT. - cur_res : les résultats du semestre sur lequel a été appelé l'export. + self.cur_res == résultats du semestre sur lequel a été appelé l'export. """ + if not self.validation_annee_but: + # pas de décision ou pas de sem. impair + return VOID_APO_RES + + return dict( + N="", + B=20, + J="", + R=ScoDocSiteConfig.get_code_apo(self.validation_annee_but.code), + M="", + ) + + def _but_load_validation_annuelle(self): + "charge la validation de jury BUT annuelle" # le semestre impair de l'année scolaire - if cur_res.formsemestre.semestre_id % 2: - formsemestre = cur_res.formsemestre + if self.cur_res.formsemestre.semestre_id % 2: + formsemestre = self.cur_res.formsemestre elif ( - autre_res - and autre_res.formsemestre.annee_scolaire() - == cur_res.formsemestre.annee_scolaire() + self.autre_res + and self.autre_res.formsemestre.annee_scolaire() + == self.cur_res.formsemestre.annee_scolaire() ): - formsemestre = autre_res.formsemestre + formsemestre = self.autre_res.formsemestre assert formsemestre.semestre_id % 2 else: # ne trouve pas de semestre impair - return VOID_APO_RES - - validation: ApcValidationAnnee = ApcValidationAnnee.query.filter_by( - formsemestre_id=formsemestre.id, etudid=etudid - ).first() - if validation is None: - return VOID_APO_RES - return dict( - N="", B=20, J="", R=ScoDocSiteConfig.get_code_apo(validation.code), M="" + self.validation_annee_but = None + return + self.validation_annee_but: ApcValidationAnnee = ( + ApcValidationAnnee.query.filter_by( + formsemestre_id=formsemestre.id, etudid=self.etud["etudid"] + ).first() ) - def etud_semestres_de_etape(self, apo_data): - """ + def etud_set_semestres_de_etape(self, apo_data: "ApoData"): + """Set .cur_sem and .autre_sem et charge les résultats. Lorsqu'on a une formation semestrialisée mais avec un code étape annuel, il faut considérer les deux semestres ((S1,S2) ou (S3,S4)) pour calculer le code annuel (VET ou VRT1A (voir elt_annee_apo)). @@ -619,7 +502,7 @@ class ApoEtud(dict): Pour les jurys intermediaires (janvier, S1 ou S3): (S2 ou S4) de la même étape lors d'une année précédente ? - Renvoie le semestre "courant" et l'autre semestre, ou None s'il n'y en a pas. + Set cur_sem: le semestre "courant" et autre_sem, ou None s'il n'y en a pas. """ # Cherche le semestre "courant": cur_sems = [ @@ -644,19 +527,29 @@ class ApoEtud(dict): cur_sem = None for sem in cur_sems: formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"]) - nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) - has_decision = nt.etud_has_decision(self.etud["etudid"]) + res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) + has_decision = res.etud_has_decision(self.etud["etudid"]) if has_decision: cur_sem = sem + self.cur_res = res break if cur_sem is None: cur_sem = cur_sems[0] # aucun avec décision, prend le plus recent + if res.formsemestre.id == cur_sem["formsemestre_id"]: + self.cur_res = res + else: + formsemestre = FormSemestre.query.get_or_404( + cur_sem["formsemestre_id"] + ) + self.cur_res = res_sem.load_formsemestre_results(formsemestre) + + self.cur_sem = cur_sem if apo_data.cur_semestre_id <= 0: - return ( - cur_sem, - None, - ) # "autre_sem" non pertinent pour sessions sans semestres + # "autre_sem" non pertinent pour sessions sans semestres: + self.autre_sem = None + self.autre_res = None + return if apo_data.jury_intermediaire: # jury de janvier # Le semestre suivant: exemple 2 si on est en jury de S1 @@ -674,7 +567,7 @@ class ApoEtud(dict): courant_annee_debut = apo_data.annee_scolaire + 1 courant_mois_debut = 1 # ou 2 (fev-jul) else: - raise ValueError("invalid pediode value !") # bug ? + raise ValueError("invalid periode value !") # bug ? courant_date_debut = "%d-%02d-01" % ( courant_annee_debut, courant_mois_debut, @@ -705,15 +598,24 @@ class ApoEtud(dict): autre_sem = None for sem in autres_sems: formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"]) - nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) - decision = nt.get_etud_decision_sem(self.etud["etudid"]) - if decision: + res: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) + if res.is_apc: + has_decision = res.etud_has_decision(self.etud["etudid"]) + else: + has_decision = res.get_etud_decision_sem(self.etud["etudid"]) + if has_decision: autre_sem = sem break if autre_sem is None: autre_sem = autres_sems[0] # aucun avec decision, prend le plus recent - return cur_sem, autre_sem + self.autre_sem = autre_sem + # Charge les résultats: + if autre_sem: + formsemestre = FormSemestre.query.get_or_404(autre_sem["formsemestre_id"]) + self.autre_res = res_sem.load_formsemestre_results(formsemestre) + else: + self.autre_res = None class ApoData: @@ -746,24 +648,14 @@ class ApoData: "1 sem. sept-jan, 2 sem. fev-jul. 0 si étape en 1 seul semestre." self.is_apc = None "Vrai si BUT" - self.header: str = "" - "début du fichier Apogée (sera ré-écrit non modifié)" - self.titles: dict[str, str] = {} - "titres Apogée (section XX-APO_TITRES-XX)" try: - self.read_csv(data) + self.apo_csv = ApoCSVReadWrite(data) except ScoFormatError as e: - # essaie de retrouver le nom du fichier pour enrichir le message d'erreur - filename = "" - if self.orig_filename is None: - if hasattr(self, "titles"): - filename = self.titles.get("apoC_Fichier_Exp", filename) - else: - filename = self.orig_filename + # enrichit le message d'erreur + filename = self.orig_filename or e.filename raise ScoFormatError( - "

Erreur lecture du fichier Apogée %s

" % filename - + e.args[0] - + "

" + f"""

Erreur lecture du fichier Apogée {filename}

+

{e.args[0]}

""" ) from e self.etape_apogee = self.get_etape_apogee() # 'V1RT' self.vdi_apogee = self.get_vdi_apogee() # '111' @@ -773,12 +665,27 @@ class ApoData: self.jury_intermediaire = ( False # True si jury à mi-étape, eg jury de S1 dans l'étape (S1, S2) ) - + # Crée les étudiants + self.etuds = [ + ApoEtud( + apo_etud_tuple, + export_res_etape=export_res_etape, + export_res_sem=export_res_sem, + export_res_ues=export_res_ues, + export_res_modules=export_res_modules, + export_res_sdj=export_res_sdj, + export_res_rat=export_res_rat, + ) + for apo_etud_tuple in self.apo_csv.csv_etuds + ] + self.etud_by_nip = {apo_etud["nip"]: apo_etud for apo_etud in self.etuds} log(f"ApoData( periode={self.periode}, annee_scolaire={self.annee_scolaire} )") def setup(self): """Recherche semestres ScoDoc concernés""" self.sems_etape = comp_apo_sems(self.etape_apogee, self.annee_scolaire) + if not self.sems_etape: + raise ScoValueError("aucun semestre trouvé !") self.formsemestres_etape = [ FormSemestre.query.get_or_404(s["formsemestre_id"]) for s in self.sems_etape ] @@ -839,196 +746,74 @@ class ApoData: else: self.sems_periode = None - def read_csv(self, data: str): - if not data: - raise ScoFormatError("Fichier Apogée vide !") - f = StringIOFileLineWrapper(data) # pour traiter comme un fichier - # check that we are at the begining of Apogee CSV - line = f.readline().strip() - if line != "XX-APO_TITRES-XX": - raise ScoFormatError("format incorrect: pas de XX-APO_TITRES-XX") - - # 1-- En-tête: du début jusqu'à la balise XX-APO_VALEURS-XX - try: - idx = data.index("XX-APO_VALEURS-XX") - except ValueError as exc: - raise ScoFormatError("format incorrect: pas de XX-APO_VALEURS-XX") from exc - self.header = data[:idx] - - # 2-- Titres: - # on va y chercher apoC_Fichier_Exp qui donnera le nom du fichier - # ainsi que l'année scolaire et le code diplôme. - self.titles = _apo_read_TITRES(f) - - # 3-- La section XX-APO_TYP_RES-XX est ignorée: - line = f.readline().strip() - if line != "XX-APO_TYP_RES-XX": - raise ScoFormatError("format incorrect: pas de XX-APO_TYP_RES-XX") - _apo_skip_section(f) - - # 4-- Définition de colonnes: (on y trouve aussi l'étape) - line = f.readline().strip() - if line != "XX-APO_COLONNES-XX": - raise ScoFormatError("format incorrect: pas de XX-APO_COLONNES-XX") - self.cols = _apo_read_cols(f) - self.apo_elts = self._group_elt_cols(self.cols) - - # 5-- Section XX-APO_VALEURS-XX - # Lecture des étudiants et de leurs résultats - while True: # skip - line = f.readline() - if not line: - raise ScoFormatError("format incorrect: pas de XX-APO_VALEURS-XX") - if line.strip() == "XX-APO_VALEURS-XX": - break - self.column_titles = f.readline() - self.col_ids = self.column_titles.strip().split() - self.etuds = self.apo_read_etuds(f) - self.etud_by_nip = {e["nip"]: e for e in self.etuds} - - def get_etud_by_nip(self, nip): - "returns ApoEtud with a given NIP code" - return self.etud_by_nip[nip] - - def _group_elt_cols(self, cols): - """Return ordered dict of ApoElt from list of ApoCols. - Clé: id apogée, eg 'V1RT', 'V1GE2201', ... - Valeur: ApoElt, avec les attributs code, type_objet - - Si les id Apogée ne sont pas uniques (ce n'est pas garanti), garde le premier - """ - elts = collections.OrderedDict() - for col_id in sorted(list(cols.keys()), reverse=True): - col = cols[col_id] - if col["Code"] in elts: - elts[col["Code"]].append(col) - else: - elts[col["Code"]] = ApoElt([col]) - return elts # { code apo : ApoElt } - - def apo_read_etuds(self, f) -> list[ApoEtud]: - """Lecture des etudiants (et resultats) du fichier CSV Apogée""" - L = [] - while True: - line = f.readline() - if not line: - break - if not line.strip(): - continue # silently ignore blank lines - line = line.strip(APO_NEWLINE) - fields = line.split(APO_SEP) - cols = {} # { col_id : value } - for i, field in enumerate(fields): - cols[self.col_ids[i]] = field - L.append( - ApoEtud( - nip=fields[0], # id etudiant - nom=fields[1], - prenom=fields[2], - naissance=fields[3], - cols=cols, - export_res_etape=self.export_res_etape, - export_res_sem=self.export_res_sem, - export_res_ues=self.export_res_ues, - export_res_modules=self.export_res_modules, - export_res_sdj=self.export_res_sdj, - export_res_rat=self.export_res_rat, - ) - ) - - return L - - def get_etape_apogee(self): + def get_etape_apogee(self) -> str: """Le code etape: 'V1RT', donné par le code de l'élément VET""" - for elt in self.apo_elts.values(): + for elt in self.apo_csv.apo_elts.values(): if elt.type_objet == "VET": return elt.code raise ScoValueError("Pas de code etape Apogee (manque élément VET)") - def get_vdi_apogee(self): + def get_vdi_apogee(self) -> str: """le VDI (version de diplôme), stocké dans l'élément VET (note: on pourrait peut-être aussi bien le récupérer dans l'en-tête XX-APO_TITRES-XX apoC_cod_vdi) """ - for elt in self.apo_elts.values(): + for elt in self.apo_csv.apo_elts.values(): if elt.type_objet == "VET": return elt.version raise ScoValueError("Pas de VDI Apogee (manque élément VET)") - def get_cod_dip_apogee(self): + def get_cod_dip_apogee(self) -> str: """Le code diplôme, indiqué dans l'en-tête de la maquette exemple: VDTRT Retourne '' si absent. """ - return self.titles.get("apoC_cod_dip", "") + return self.apo_csv.titles.get("apoC_cod_dip", "") - def get_annee_scolaire(self): + def get_annee_scolaire(self) -> int: """Annee scolaire du fichier Apogee: un integer = annee du mois de septembre de début """ - m = re.match("[12][0-9]{3}", self.titles["apoC_annee"]) + m = re.match("[12][0-9]{3}", self.apo_csv.titles["apoC_annee"]) if not m: raise ScoFormatError( - f"""Annee scolaire (apoC_annee) invalide: "{self.titles["apoC_annee"]}" """ + f"""Annee scolaire (apoC_annee) invalide: "{self.apo_csv.titles["apoC_annee"]}" """ ) return int(m.group(0)) - def write_header(self, f): - """write apo CSV header on f - (beginning of CSV until columns titles just after XX-APO_VALEURS-XX line) - """ - f.write(self.header) - f.write(APO_NEWLINE) - f.write("XX-APO_VALEURS-XX" + APO_NEWLINE) - f.write(self.column_titles) - - def write_etuds(self, f): - """write apo CSV etuds on f""" - for e in self.etuds: - fields = [] # e['nip'], e['nom'], e['prenom'], e['naissance'] ] - for col_id in self.col_ids: - try: - fields.append(str(e.new_cols[col_id])) - except KeyError: - log( - "Error: %s %s missing column key %s" - % (e["nip"], e["nom"], col_id) - ) - log("Details:\ne = %s" % pprint.pformat(e)) - log("col_ids=%s" % pprint.pformat(self.col_ids)) - log("etudiant ignore.\n") - - f.write(APO_SEP.join(fields) + APO_NEWLINE) - def list_unknown_elements(self) -> list[str]: """Liste des codes des elements Apogee non trouvés dans ScoDoc (après traitement de tous les étudiants) """ codes = set() - for e in self.etuds: - codes.update({code for code in e.col_elts if e.col_elts[code] is None}) + for apo_etud in self.etuds: + codes.update( + {code for code in apo_etud.col_elts if apo_etud.col_elts[code] is None} + ) codes_list = list(codes) codes_list.sort() return codes_list - def list_elements(self): + def list_elements(self) -> tuple[set[str], set[str]]: """Liste les codes des elements Apogée de la maquette et ceux des semestres ScoDoc associés Retourne deux ensembles """ try: - maq_elems = {self.cols[col_id]["Code"] for col_id in self.col_ids[4:]} - except KeyError: + maq_elems = { + self.apo_csv.cols[col_id]["Code"] for col_id in self.apo_csv.col_ids[4:] + } + except KeyError as exc: # une colonne déclarée dans l'en-tête n'est pas présente - declared = self.col_ids[4:] # id des colones dans l'en-tête - present = sorted(self.cols.keys()) # colones presentes + declared = self.apo_csv.col_ids[4:] # id des colones dans l'en-tête + present = sorted(self.apo_csv.cols.keys()) # colonnes présentes log("Fichier Apogee invalide:") log(f"Colonnes declarees: {declared}") log(f"Colonnes presentes: {present}") raise ScoFormatError( f"""Fichier Apogee invalide
Colonnes declarees: {declared}
Colonnes presentes: {present}""" - ) + ) from exc # l'ensemble de tous les codes des elements apo des semestres: sem_elems = reduce(set.union, list(self.get_codes_by_sem().values()), set()) @@ -1057,8 +842,8 @@ class ApoData: ) s = set() codes_by_sem[sem["formsemestre_id"]] = s - for col_id in self.col_ids[4:]: - code = self.cols[col_id]["Code"] # 'V1RT' + for col_id in self.apo_csv.col_ids[4:]: + code = self.apo_csv.cols[col_id]["Code"] # 'V1RT' # associé à l'étape, l'année ou le semestre: if code in codes_semestre: s.add(code) @@ -1075,22 +860,22 @@ class ApoData: def build_cr_table(self): """Table compte rendu des décisions""" - CR = [] # tableau compte rendu des decisions - for e in self.etuds: + rows = [] # tableau compte rendu des decisions + for apo_etud in self.etuds: cr = { - "NIP": e["nip"], - "nom": e["nom"], - "prenom": e["prenom"], - "est_NAR": e.is_NAR, - "commentaire": "; ".join(e.log), + "NIP": apo_etud["nip"], + "nom": apo_etud["nom"], + "prenom": apo_etud["prenom"], + "est_NAR": apo_etud.is_nar, + "commentaire": "; ".join(apo_etud.log), } - if e.col_elts and e.col_elts[self.etape_apogee] is not None: - cr["etape"] = e.col_elts[self.etape_apogee].get("R", "") - cr["etape_note"] = e.col_elts[self.etape_apogee].get("N", "") + if apo_etud.col_elts and apo_etud.col_elts[self.etape_apogee] is not None: + cr["etape"] = apo_etud.col_elts[self.etape_apogee].get("R", "") + cr["etape_note"] = apo_etud.col_elts[self.etape_apogee].get("N", "") else: cr["etape"] = "" cr["etape_note"] = "" - CR.append(cr) + rows.append(cr) columns_ids = ["NIP", "nom", "prenom"] columns_ids.extend(("etape", "etape_note", "est_NAR", "commentaire")) @@ -1098,102 +883,12 @@ class ApoData: T = GenTable( columns_ids=columns_ids, titles=dict(zip(columns_ids, columns_ids)), - rows=CR, + rows=rows, xls_sheet_name="Decisions ScoDoc", ) return T -def _apo_read_cols(f): - """Lecture colonnes apo : - Démarre après la balise XX-APO_COLONNES-XX - et s'arrête après la balise APO_COL_VAL_FIN - - Colonne Apogee: les champs sont données par la ligne - apoL_a01_code de la section XX-APO_COLONNES-XX - col_id est apoL_c0001, apoL_c0002, ... - - :return: { col_id : { title : value } } - Example: { 'apoL_c0001' : { 'Type Objet' : 'VET', 'Code' : 'V1IN', ... }, ... } - """ - line = f.readline().strip(" " + APO_NEWLINE) - fields = line.split(APO_SEP) - if fields[0] != "apoL_a01_code": - raise ScoFormatError(f"invalid line: {line} (expecting apoL_a01_code)") - col_keys = fields - - while True: # skip premiere partie (apoL_a02_nom, ...) - line = f.readline().strip(" " + APO_NEWLINE) - if line == "APO_COL_VAL_DEB": - break - # après APO_COL_VAL_DEB - cols = {} - i = 0 - while True: - line = f.readline().strip(" " + APO_NEWLINE) - if line == "APO_COL_VAL_FIN": - break - i += 1 - fields = line.split(APO_SEP) - # sanity check - col_id = fields[0] # apoL_c0001, ... - if col_id in cols: - raise ScoFormatError(f"duplicate column definition: {col_id}") - m = re.match(r"^apoL_c([0-9]{4})$", col_id) - if not m: - raise ScoFormatError( - f"invalid column id: {line} (expecting apoL_c{col_id})" - ) - if int(m.group(1)) != i: - raise ScoFormatError(f"invalid column id: {col_id} for index {i}") - - cols[col_id] = DictCol(list(zip(col_keys, fields))) - cols[col_id].lineno = f.lineno # for debuging purpose - - return cols - - -def _apo_read_TITRES(f) -> dict: - "Lecture section TITRES du fichier Apogée, renvoie dict" - d = {} - while True: - line = f.readline().strip( - " " + APO_NEWLINE - ) # ne retire pas le \t (pour les clés vides) - if not line.strip(): # stoppe sur ligne pleines de \t - break - - fields = line.split(APO_SEP) - if len(fields) == 2: - k, v = fields - else: - log(f"Error read CSV: \nline={line}\nfields={fields}") - log(dir(f)) - raise ScoFormatError( - f"Fichier Apogee incorrect (section titres, {len(fields)} champs au lieu de 2)" - ) - d[k] = v - # - if not d.get("apoC_Fichier_Exp", None): - raise ScoFormatError("Fichier Apogee incorrect: pas de titre apoC_Fichier_Exp") - # keep only basename: may be a windows or unix pathname - s = d["apoC_Fichier_Exp"].split("/")[-1] - s = s.split("\\")[-1] # for DOS paths, eg C:\TEMP\VL4RT_V3ASR.TXT - d["apoC_Fichier_Exp"] = s - return d - - -def _apo_skip_section(f): - "Saute section Apo: s'arrete apres ligne vide" - while True: - line = f.readline().strip() - if not line: - break - - -# ------------------------------------- - - def comp_apo_sems(etape_apogee, annee_scolaire: int) -> list[dict]: """ :param etape_apogee: etape (string or ApoEtapeVDI) @@ -1205,13 +900,13 @@ def comp_apo_sems(etape_apogee, annee_scolaire: int) -> list[dict]: ) -def nar_etuds_table(apo_data, NAR_Etuds): +def nar_etuds_table(apo_data, nar_etuds): """Liste les NAR -> excel table""" code_etape = apo_data.etape_apogee today = datetime.datetime.today().strftime("%d/%m/%y") rows = [] - NAR_Etuds.sort(key=lambda k: k["nom"]) - for e in NAR_Etuds: + nar_etuds.sort(key=lambda k: k["nom"]) + for e in nar_etuds: rows.append( { "nom": e["nom"], @@ -1290,29 +985,31 @@ def export_csv_to_apogee( export_res_rat=export_res_rat, ) apo_data.setup() # -> .sems_etape - - for e in apo_data.etuds: - e.is_apc = apo_data.is_apc - e.lookup_scodoc(apo_data.etape_formsemestre_ids) - e.associate_sco(apo_data) + apo_csv = apo_data.apo_csv + for apo_etud in apo_data.etuds: + apo_etud.is_apc = apo_data.is_apc + apo_etud.lookup_scodoc(apo_data.etape_formsemestre_ids) + apo_etud.associate_sco(apo_data) # Ré-écrit le fichier Apogée - f = io.StringIO() - apo_data.write_header(f) - apo_data.write_etuds(f) + csv_data = apo_csv.write(apo_data.etuds) # Table des NAR: - NAR_Etuds = [e for e in apo_data.etuds if e.is_NAR] - if NAR_Etuds: - nar_xls = nar_etuds_table(apo_data, NAR_Etuds) + nar_etuds = [apo_etud for apo_etud in apo_data.etuds if apo_etud.is_nar] + if nar_etuds: + nar_xls = nar_etuds_table(apo_data, nar_etuds) else: nar_xls = None # Journaux & Comptes-rendus # Orphelins: etudiants dans fichier Apogée mais pas dans ScoDoc - Apo_Non_ScoDoc = [e for e in apo_data.etuds if e.etat == ETUD_ORPHELIN] + apo_non_scodoc = [ + apo_etud for apo_etud in apo_data.etuds if apo_etud.etat == ETUD_ORPHELIN + ] # Non inscrits: connus de ScoDoc mais pas inscrit dans l'étape cette année - Apo_Non_ScoDoc_Inscrits = [e for e in apo_data.etuds if e.etat == ETUD_NON_INSCRIT] + apo_non_scodoc_inscrits = [ + apo_etud for apo_etud in apo_data.etuds if apo_etud.etat == ETUD_NON_INSCRIT + ] # CR table cr_table = apo_data.build_cr_table() cr_xls = cr_table.excel() @@ -1325,19 +1022,19 @@ def export_csv_to_apogee( else: my_zip = False # Ensure unique filenames - filename = apo_data.titles["apoC_Fichier_Exp"] + filename = apo_csv.get_filename() basename, ext = os.path.splitext(filename) csv_filename = filename if csv_filename in dest_zip.namelist(): basename = filename + "-" + apo_data.vdi_apogee csv_filename = basename + ext - nf = 1 + num_file = 1 tmplname = basename while csv_filename in dest_zip.namelist(): - basename = tmplname + "-%d" % nf + basename = f"{tmplname}-{num_file}" csv_filename = basename + ext - nf += 1 + num_file += 1 log_filename = "scodoc-" + basename + ".log.txt" nar_filename = basename + "-nar" + scu.XLSX_SUFFIX @@ -1361,7 +1058,7 @@ def export_csv_to_apogee( logf.write( "\nÉtudiants Apogée non trouvés dans ScoDoc:\n" + "\n".join( - ["%s\t%s\t%s" % (e["nip"], e["nom"], e["prenom"]) for e in Apo_Non_ScoDoc] + ["%s\t%s\t%s" % (e["nip"], e["nom"], e["prenom"]) for e in apo_non_scodoc] ) ) logf.write( @@ -1369,7 +1066,7 @@ def export_csv_to_apogee( + "\n".join( [ "%s\t%s\t%s" % (e["nip"], e["nom"], e["prenom"]) - for e in Apo_Non_ScoDoc_Inscrits + for e in apo_non_scodoc_inscrits ] ) ) @@ -1380,8 +1077,6 @@ def export_csv_to_apogee( ) log(logf.getvalue()) # sortie aussi sur le log ScoDoc - csv_data = f.getvalue().encode(APO_OUTPUT_ENCODING) - # Write data to ZIP dest_zip.writestr(csv_filename, csv_data) dest_zip.writestr(log_filename, logf.getvalue()) diff --git a/app/scodoc/sco_apogee_reader.py b/app/scodoc/sco_apogee_reader.py new file mode 100644 index 000000000..abe8e2bf3 --- /dev/null +++ b/app/scodoc/sco_apogee_reader.py @@ -0,0 +1,483 @@ +############################################################################## +# +# Gestion scolarite IUT +# +# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Emmanuel Viennet emmanuel.viennet@viennet.net +# +############################################################################## + +"""Lecture du fichier "maquette" Apogée + +Le fichier CSV, champs séparés par des tabulations, a la structure suivante: + +
+ XX-APO_TITRES-XX
+ apoC_annee	2007/2008
+ apoC_cod_dip	VDTCJ
+ apoC_Cod_Exp	1
+ apoC_cod_vdi	111
+ apoC_Fichier_Exp	VDTCJ_V1CJ.txt
+ apoC_lib_dip	DUT CJ
+ apoC_Titre1	Export Apogée du 13/06/2008 à 14:29
+ apoC_Titre2
+
+ XX-APO_TYP_RES-XX
+ ...section optionnelle au contenu quelconque...
+
+ XX-APO_COLONNES-XX
+ apoL_a01_code	Type Objet	Code	Version	Année	Session	Admission/Admissibilité	Type Rés.			Etudiant	Numéro
+ apoL_a02_nom										1	Nom
+ apoL_a03_prenom										1	Prénom
+ apoL_a04_naissance									Session	Admissibilité	Naissance
+ APO_COL_VAL_DEB
+ apoL_c0001	VET	V1CJ	111	2007	0	1	N	V1CJ - DUT CJ an1	0	1	Note
+ apoL_c0002	VET	V1CJ	111	2007	0	1	B		0	1	Barème
+ apoL_c0003	VET	V1CJ	111	2007	0	1	R		0	1	Résultat
+ APO_COL_VAL_FIN
+ apoL_c0030	APO_COL_VAL_FIN
+
+ XX-APO_VALEURS-XX
+ apoL_a01_code	apoL_a02_nom	apoL_a03_prenom	apoL_a04_naissance	apoL_c0001	apoL_c0002	apoL_c0003	apoL_c0004	apoL_c0005	apoL_c0006	apoL_c0007	apoL_c0008	apoL_c0009	apoL_c0010	apoL_c0011	apoL_c0012	apoL_c0013	apoL_c0014	apoL_c0015	apoL_c0016	apoL_c0017	apoL_c0018	apoL_c0019	apoL_c0020	apoL_c0021	apoL_c0022	apoL_c0023	apoL_c0024	apoL_c0025	apoL_c0026	apoL_c0027	apoL_c0028	apoL_c0029
+ 10601232	AARIF	MALIKA	 22/09/1986	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM	18	20	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM
+ 
+ + + On récupère nos éléments pédagogiques dans la section XX-APO-COLONNES-XX et + notre liste d'étudiants dans la section XX-APO_VALEURS-XX. Les champs de la + section XX-APO_VALEURS-XX sont décrits par les lignes successives de la + section XX-APO_COLONNES-XX. + + Le fichier CSV correspond à une étape, qui est récupérée sur la ligne +
+ apoL_c0001	VET	V1CJ ...
+ 
+""" +from collections import namedtuple +import io +import pprint +import re + +# Pour la détection auto de l'encodage des fichiers Apogée: +from chardet import detect as chardet_detect + +from app import log +from app.scodoc.sco_exceptions import ScoFormatError + +APO_PORTAL_ENCODING = ( + "utf8" # encodage du fichier CSV Apogée (était 'ISO-8859-1' avant jul. 2016) +) +APO_INPUT_ENCODING = "ISO-8859-1" # +APO_OUTPUT_ENCODING = APO_INPUT_ENCODING # encodage des fichiers Apogee générés +APO_DECIMAL_SEP = "," # separateur décimal: virgule +APO_SEP = "\t" +APO_NEWLINE = "\r\n" + +ApoEtudTuple = namedtuple("ApoEtudTuple", ("nip", "nom", "prenom", "naissance", "cols")) + + +class DictCol(dict): + "A dict, where we can add attributes" + + +class StringIOWithLineNumber(io.StringIO): + "simple wrapper to use a string as a file with line numbers" + + def __init__(self, data: str): + super().__init__(data) + self.lineno = 0 + + def readline(self): + self.lineno += 1 + return super().readline() + + +class ApoCSVReadWrite: + "Gestion lecture/écriture de fichiers csv Apogée" + + def __init__(self, data: str): + if not data: + raise ScoFormatError("Fichier Apogée vide !") + self.data = data + self._file = StringIOWithLineNumber(data) # pour traiter comme un fichier + self.apo_elts: dict = None + self.cols: dict[str, dict[str, str]] = None + self.column_titles: str = None + self.col_ids: list[str] = None + self.csv_etuds: list[ApoEtudTuple] = [] + # section_str: utilisé pour ré-écrire les headers sans aucune altération + self.sections_str: dict[str, str] = {} + "contenu initial de chaque section" + # self.header: str = "" + # "début du fichier Apogée jusqu'à XX-APO_TYP_RES-XX non inclu (sera ré-écrit non modifié)" + self.header_apo_typ_res: str = "" + "section XX-APO_TYP_RES-XX (qui peut en option ne pas être ré-écrite)" + self.titles: dict[str, str] = {} + "titres Apogée (section XX-APO_TITRES-XX)" + + self.read_sections() + + # Check that we have collected all requested infos: + if not self.header_apo_typ_res: + # on pourrait rendre XX-APO_TYP_RES-XX optionnelle mais mieux vaut vérifier: + raise ScoFormatError( + "format incorrect: pas de XX-APO_TYP_RES-XX", + filename=self.get_filename(), + ) + if self.cols is None: + raise ScoFormatError( + "format incorrect: pas de XX-APO_COLONNES-XX", + filename=self.get_filename(), + ) + if self.column_titles is None: + raise ScoFormatError( + "format incorrect: pas de XX-APO_VALEURS-XX", + filename=self.get_filename(), + ) + + def read_sections(self): + """Lit une à une les sections du fichier Apogée""" + # sanity check: we are at the begining of Apogee CSV + start_pos = self._file.tell() + section = self._file.readline().strip() + if section != "XX-APO_TITRES-XX": + raise ScoFormatError("format incorrect: pas de XX-APO_TITRES-XX") + + while True: + self.read_section(section) + line, end_pos = _apo_next_non_blank_line(self._file) + self.sections_str[section] = self.data[start_pos:end_pos] + if not line: + break + section = line + start_pos = end_pos + + def read_section(self, section_name: str): + """Read a section: _file is on the first line after section title""" + if section_name == "XX-APO_TITRES-XX": + # Titres: + # on va y chercher apoC_Fichier_Exp qui donnera le nom du fichier + # ainsi que l'année scolaire et le code diplôme. + self.titles = self._apo_read_titres(self._file) + elif section_name == "XX-APO_TYP_RES-XX": + self.header_apo_typ_res = _apo_read_typ_res(self._file) + elif section_name == "XX-APO_COLONNES-XX": + self.cols = self.apo_read_cols() + self.apo_elts = self.group_elt_cols(self.cols) + elif section_name == "XX-APO_VALEURS-XX": + # les étudiants + self.apo_read_section_valeurs() + else: + raise ScoFormatError( + f"format incorrect: section inconnue: {section_name}", + filename=self.get_filename(), + ) + + def apo_read_cols(self): + """Lecture colonnes apo : + Démarre après la balise XX-APO_COLONNES-XX + et s'arrête après la ligne suivant la balise APO_COL_VAL_FIN + + Colonne Apogee: les champs sont données par la ligne + apoL_a01_code de la section XX-APO_COLONNES-XX + col_id est apoL_c0001, apoL_c0002, ... + + :return: { col_id : { title : value } } + Example: { 'apoL_c0001' : { 'Type Objet' : 'VET', 'Code' : 'V1IN', ... }, ... } + """ + line = self._file.readline().strip(" " + APO_NEWLINE) + fields = line.split(APO_SEP) + if fields[0] != "apoL_a01_code": + raise ScoFormatError( + f"invalid line: {line} (expecting apoL_a01_code)", + filename=self.get_filename(), + ) + col_keys = fields + + while True: # skip premiere partie (apoL_a02_nom, ...) + line = self._file.readline().strip(" " + APO_NEWLINE) + if line == "APO_COL_VAL_DEB": + break + # après APO_COL_VAL_DEB + cols = {} + i = 0 + while True: + line = self._file.readline().strip(" " + APO_NEWLINE) + if line == "APO_COL_VAL_FIN": + break + i += 1 + fields = line.split(APO_SEP) + # sanity check + col_id = fields[0] # apoL_c0001, ... + if col_id in cols: + raise ScoFormatError( + f"duplicate column definition: {col_id}", + filename=self.get_filename(), + ) + m = re.match(r"^apoL_c([0-9]{4})$", col_id) + if not m: + raise ScoFormatError( + f"invalid column id: {line} (expecting apoL_c{col_id})", + filename=self.get_filename(), + ) + if int(m.group(1)) != i: + raise ScoFormatError( + f"invalid column id: {col_id} for index {i}", + filename=self.get_filename(), + ) + + cols[col_id] = DictCol(list(zip(col_keys, fields))) + cols[col_id].lineno = self._file.lineno # for debuging purpose + + self._file.readline() # skip next line + + return cols + + def group_elt_cols(self, cols) -> dict: + """Return (ordered) dict of ApoElt from list of ApoCols. + Clé: id apogée, eg 'V1RT', 'V1GE2201', ... + Valeur: ApoElt, avec les attributs code, type_objet + + Si les id Apogée ne sont pas uniques (ce n'est pas garanti), garde le premier + """ + elts = {} + for col_id in sorted(list(cols.keys()), reverse=True): + col = cols[col_id] + if col["Code"] in elts: + elts[col["Code"]].append(col) + else: + elts[col["Code"]] = ApoElt([col]) + return elts # { code apo : ApoElt } + + def apo_read_section_valeurs(self): + "traitement de la section XX-APO_VALEURS-XX" + self.column_titles = self._file.readline() + self.col_ids = self.column_titles.strip().split() + self.csv_etuds = self.apo_read_etuds() + + def apo_read_etuds(self) -> list[ApoEtudTuple]: + """Lecture des étudiants (et résultats) du fichier CSV Apogée. + Les lignes "étudiant" commencent toujours par + `12345678 NOM PRENOM 15/05/2003` + le premier code étant le NIP. + """ + etud_tuples = [] + while True: + line = self._file.readline() + # cette section est impérativement la dernière du fichier + # donc on arrête ici: + if not line: + break + if not line.strip(): + continue # silently ignore blank lines + line = line.strip(APO_NEWLINE) + fields = line.split(APO_SEP) + if len(fields) < 4: + raise ScoFormatError( + """Ligne étudiant invalide + (doit commencer par 'NIP NOM PRENOM dd/mm/yyyy')""", + filename=self.get_filename(), + ) + cols = {} # { col_id : value } + for i, field in enumerate(fields): + cols[self.col_ids[i]] = field + etud_tuples.append( + ApoEtudTuple( + nip=fields[0], # id etudiant + nom=fields[1], + prenom=fields[2], + naissance=fields[3], + cols=cols, + ) + # XXX à remettre dans apogee_csv.py + # export_res_etape=self.export_res_etape, + # export_res_sem=self.export_res_sem, + # export_res_ues=self.export_res_ues, + # export_res_modules=self.export_res_modules, + # export_res_sdj=self.export_res_sdj, + # export_res_rat=self.export_res_rat, + # ) + ) + + return etud_tuples + + def _apo_read_titres(self, f) -> dict: + "Lecture section TITRES du fichier Apogée, renvoie dict" + d = {} + while True: + line = f.readline().strip( + " " + APO_NEWLINE + ) # ne retire pas le \t (pour les clés vides) + if not line.strip(): # stoppe sur ligne pleines de \t + break + + fields = line.split(APO_SEP) + if len(fields) == 2: + k, v = fields + else: + log(f"Error read CSV: \nline={line}\nfields={fields}") + log(dir(f)) + raise ScoFormatError( + f"Fichier Apogee incorrect (section titres, {len(fields)} champs au lieu de 2)", + filename=self.get_filename(), + ) + d[k] = v + # + if not d.get("apoC_Fichier_Exp", None): + raise ScoFormatError( + "Fichier Apogee incorrect: pas de titre apoC_Fichier_Exp", + filename=self.get_filename(), + ) + # keep only basename: may be a windows or unix pathname + s = d["apoC_Fichier_Exp"].split("/")[-1] + s = s.split("\\")[-1] # for DOS paths, eg C:\TEMP\VL4RT_V3ASR.TXT + d["apoC_Fichier_Exp"] = s + return d + + def get_filename(self) -> str: + """Le nom du fichier APogée, tel qu'indiqué dans le fichier + ou vide.""" + if self.titles: + return self.titles.get("apoC_Fichier_Exp", "") + return "" + + def write(self, apo_etuds: list["ApoEtud"]) -> bytes: + """Renvoie le contenu actualisé du fichier Apogée""" + f = io.StringIO() + self._write_header(f) + self._write_etuds(f, apo_etuds) + return f.getvalue().encode(APO_OUTPUT_ENCODING) + + def _write_etuds(self, f, apo_etuds: list["ApoEtud"]): + """write apo CSV etuds on f""" + for apo_etud in apo_etuds: + fields = [] # e['nip'], e['nom'], e['prenom'], e['naissance'] ] + for col_id in self.col_ids: + try: + fields.append(str(apo_etud.new_cols[col_id])) + except KeyError: + log( + f"""Error: {apo_etud["nip"]} {apo_etud["nom"]} missing column key {col_id} +Details:\napo_etud = {pprint.pformat(apo_etud)} +col_ids={pprint.pformat(self.col_ids)} +étudiant ignoré. +""" + ) + f.write(APO_SEP.join(fields) + APO_NEWLINE) + + def _write_header(self, f): + """write apo CSV header on f + (beginning of CSV until columns titles just after XX-APO_VALEURS-XX line) + """ + for section, data in self.sections_str.items(): + if section != "XX-APO_VALEURS-XX": + # XXX TODO ici on va filtrer XX-APO_TYP_RES-XX + f.write(data) + + f.write("XX-APO_VALEURS-XX" + APO_NEWLINE) + f.write(self.column_titles) + + +class ApoElt: + """Définition d'un Element Apogée + sur plusieurs colonnes du fichier CSV + """ + + def __init__(self, cols): + assert len(cols) > 0 + assert len(set([c["Code"] for c in cols])) == 1 # colonnes de meme code + assert len(set([c["Type Objet"] for c in cols])) == 1 # colonnes de meme type + self.cols = cols + self.code = cols[0]["Code"] + self.version = cols[0]["Version"] + self.type_objet = cols[0]["Type Objet"] + + def append(self, col): + """ajoute une "colonne" à l'élément""" + assert col["Code"] == self.code + if col["Type Objet"] != self.type_objet: + log( + f"""Warning: ApoElt: duplicate id { + self.code} ({self.type_objet} and {col["Type Objet"]})""" + ) + self.type_objet = col["Type Objet"] + self.cols.append(col) + + def __repr__(self): + return f"ApoElt(code='{self.code}', cols={pprint.pformat(self.cols)})" + + +def guess_data_encoding(text: bytes, threshold=0.6): + """Guess string encoding, using chardet heuristics. + Returns encoding, or None if detection failed (confidence below threshold) + """ + r = chardet_detect(text) + if r["confidence"] < threshold: + return None + else: + return r["encoding"] + + +def fix_data_encoding( + text: bytes, + default_source_encoding=APO_INPUT_ENCODING, + dest_encoding=APO_INPUT_ENCODING, +) -> tuple[bytes, str]: + """Try to ensure that text is using dest_encoding + returns converted text, and a message describing the conversion. + + Raises UnicodeEncodeError en cas de problème, en général liée à + une auto-détection errornée. + """ + message = "" + detected_encoding = guess_data_encoding(text) + if not detected_encoding: + if default_source_encoding != dest_encoding: + message = f"converting from {default_source_encoding} to {dest_encoding}" + text = text.decode(default_source_encoding).encode(dest_encoding) + else: + if detected_encoding != dest_encoding: + message = ( + f"converting from detected {default_source_encoding} to {dest_encoding}" + ) + text = text.decode(detected_encoding).encode(dest_encoding) + return text, message + + +def _apo_read_typ_res(f) -> str: + "Lit la section XX-APO_TYP_RES-XX" + text = "XX-APO_TYP_RES-XX" + APO_NEWLINE + while True: + line = f.readline() + stripped_line = line.strip() + if not stripped_line: + break + text += line + return text + + +def _apo_next_non_blank_line(f: StringIOWithLineNumber) -> tuple[str, int]: + "Ramène prochaine ligne non blanche, stripped, et l'indice de son début" + while True: + pos = f.tell() + line = f.readline() + if not line: + return "", -1 + stripped_line = line.strip() + if stripped_line: + return stripped_line, pos diff --git a/app/scodoc/sco_bulletins_generator.py b/app/scodoc/sco_bulletins_generator.py index ec5d258a7..b122d7a46 100644 --- a/app/scodoc/sco_bulletins_generator.py +++ b/app/scodoc/sco_bulletins_generator.py @@ -167,8 +167,9 @@ class BulletinGenerator: formsemestre_id = self.bul_dict["formsemestre_id"] nomprenom = self.bul_dict["etud"]["nomprenom"] + etat_civil = self.bul_dict["etud"]["etat_civil"] marque_debut_bulletin = sco_pdf.DebutBulletin( - nomprenom, + self.bul_dict["etat_civil"], filigranne=self.bul_dict["filigranne"], footer_content=f"""ScoDoc - Bulletin de {nomprenom} - {time.strftime("%d/%m/%Y %H:%M")}""", ) @@ -211,7 +212,7 @@ class BulletinGenerator: document, author="%s %s (E. Viennet) [%s]" % (sco_version.SCONAME, sco_version.SCOVERSION, self.description), - title=f"""Bulletin {sem["titremois"]} de {nomprenom}""", + title=f"""Bulletin {sem["titremois"]} de {etat_civil}""", subject="Bulletin de note", margins=self.margins, server_name=self.server_name, diff --git a/app/scodoc/sco_etape_apogee.py b/app/scodoc/sco_etape_apogee.py index ef60acca8..01081021e 100644 --- a/app/scodoc/sco_etape_apogee.py +++ b/app/scodoc/sco_etape_apogee.py @@ -76,7 +76,7 @@ import re import app.scodoc.sco_utils as scu from app.scodoc import sco_archives -from app.scodoc import sco_apogee_csv +from app.scodoc import sco_apogee_csv, sco_apogee_reader from app.scodoc.sco_exceptions import ScoValueError @@ -108,7 +108,7 @@ def apo_csv_store(csv_data: str, annee_scolaire, sem_id): # sanity check filesize = len(csv_data) if filesize < 10 or filesize > scu.CONFIG.ETUD_MAX_FILE_SIZE: - raise ScoValueError("Fichier csv de taille invalide ! (%d)" % filesize) + raise ScoValueError(f"Fichier csv de taille invalide ! ({filesize})") if not annee_scolaire: raise ScoValueError("Impossible de déterminer l'année scolaire !") @@ -121,13 +121,13 @@ def apo_csv_store(csv_data: str, annee_scolaire, sem_id): if str(apo_data.etape) in apo_csv_list_stored_etapes(annee_scolaire, sem_id=sem_id): raise ScoValueError( - "Etape %s déjà stockée pour cette année scolaire !" % apo_data.etape + f"Etape {apo_data.etape} déjà stockée pour cette année scolaire !" ) - oid = "%d-%d" % (annee_scolaire, sem_id) - description = "%s;%s;%s" % (str(apo_data.etape), annee_scolaire, sem_id) + oid = f"{annee_scolaire}-{sem_id}" + description = f"""{str(apo_data.etape)};{annee_scolaire};{sem_id}""" archive_id = ApoCSVArchive.create_obj_archive(oid, description) - csv_data_bytes = csv_data.encode(sco_apogee_csv.APO_OUTPUT_ENCODING) + csv_data_bytes = csv_data.encode(sco_apogee_reader.APO_OUTPUT_ENCODING) ApoCSVArchive.store(archive_id, filename, csv_data_bytes) return apo_data.etape @@ -212,7 +212,7 @@ def apo_csv_get(etape_apo="", annee_scolaire="", sem_id="") -> str: data = ApoCSVArchive.get(archive_id, etape_apo + ".csv") # ce fichier a été archivé donc généré par ScoDoc # son encodage est donc APO_OUTPUT_ENCODING - return data.decode(sco_apogee_csv.APO_OUTPUT_ENCODING) + return data.decode(sco_apogee_reader.APO_OUTPUT_ENCODING) # ------------------------------------------------------------------------ diff --git a/app/scodoc/sco_etape_apogee_view.py b/app/scodoc/sco_etape_apogee_view.py index b53fe5e02..c5ae128e8 100644 --- a/app/scodoc/sco_etape_apogee_view.py +++ b/app/scodoc/sco_etape_apogee_view.py @@ -32,13 +32,13 @@ import io from zipfile import ZipFile import flask -from flask import flash, g, request, send_file, url_for +from flask import flash, g, request, Response, send_file, url_for import app.scodoc.sco_utils as scu from app import log from app.models import Formation from app.scodoc import html_sco_header -from app.scodoc import sco_apogee_csv +from app.scodoc import sco_apogee_csv, sco_apogee_reader from app.scodoc import sco_etape_apogee from app.scodoc import sco_formsemestre from app.scodoc import sco_portal_apogee @@ -46,7 +46,7 @@ from app.scodoc import sco_preferences from app.scodoc import sco_semset from app.scodoc import sco_etud from app.scodoc.gen_tables import GenTable -from app.scodoc.sco_apogee_csv import APO_INPUT_ENCODING, APO_OUTPUT_ENCODING +from app.scodoc.sco_apogee_reader import APO_INPUT_ENCODING, APO_OUTPUT_ENCODING from app.scodoc.sco_exceptions import ScoValueError @@ -240,7 +240,11 @@ def apo_semset_maq_status( if semset["jury_ok"]: H.append("""
  • Décisions de jury saisies
  • """) else: - H.append("""
  • Il manque des décisions de jury !
  • """) + H.append( + f"""
  • Il manque de {semset["jury_nb_missing"]} + décision{"s" if semset["jury_nb_missing"] > 1 else ""} + de jury !
  • """ + ) if ok_for_export: H.append("""
  • %d étudiants, prêt pour l'export.
  • """ % len(nips_ok)) @@ -275,11 +279,10 @@ def apo_semset_maq_status( if semset and ok_for_export: H.append( - """ + f""" - + """ - % (semset_id,) ) H.append('
    ') @@ -372,7 +375,7 @@ def apo_semset_maq_status( H.append("
    ") # Aide: H.append( - """ + f"""

    Retour aux ensembles de semestres

    @@ -381,10 +384,12 @@ def apo_semset_maq_status( l'export des résultats après les jurys, puis de remplir et exporter ces fichiers.

    - Les fichiers ("maquettes") Apogée sont de type CSV, du texte codé en %s. + Les fichiers ("maquettes") Apogée sont de type CSV, du texte codé en {APO_INPUT_ENCODING}. +

    +

    On a un fichier par étape Apogée. Pour les obtenir, soit on peut les télécharger + directement (si votre ScoDoc est interfacé avec Apogée), soit se débrouiller pour + exporter le fichier texte depuis Apogée. Son contenu ressemble à cela:

    -

    On a un fichier par étape Apogée. Pour les obtenir, soit on peut les télécharger directement (si votre ScoDoc est interfacé avec Apogée), soit se débrouiller pour exporter le fichier - texte depuis Apogée. Son contenu ressemble à cela:

      XX-APO_TITRES-XX
      apoC_annee	2007/2008
    @@ -427,7 +432,6 @@ def apo_semset_maq_status(
         

    """ - % (APO_INPUT_ENCODING,) ) H.append(html_sco_header.sco_footer()) return "\n".join(H) @@ -446,21 +450,25 @@ def table_apo_csv_list(semset): # Ajoute qq infos pour affichage: csv_data = sco_etape_apogee.apo_csv_get(t["etape_apo"], annee_scolaire, sem_id) apo_data = sco_apogee_csv.ApoData(csv_data, periode=semset["sem_id"]) - t["filename"] = apo_data.titles["apoC_Fichier_Exp"] + t["filename"] = apo_data.apo_csv.titles["apoC_Fichier_Exp"] t["nb_etuds"] = len(apo_data.etuds) t["date_str"] = t["date"].strftime("%d/%m/%Y à %H:%M") - view_link = "view_apo_csv?etape_apo=%s&semset_id=%s" % ( - t["etape_apo"], - semset["semset_id"], + view_link = url_for( + "notes.view_apo_csv", + scodoc_dept=g.scodoc_dept, + etape_apo=t["etape_apo"], + semset_id=semset["semset_id"], ) t["_filename_target"] = view_link t["_etape_apo_target"] = view_link t["suppress"] = scu.icontag( "delete_small_img", border="0", alt="supprimer", title="Supprimer" ) - t["_suppress_target"] = "view_apo_csv_delete?etape_apo=%s&semset_id=%s" % ( - t["etape_apo"], - semset["semset_id"], + t["_suppress_target"] = url_for( + "notes.view_apo_csv_delete", + scodoc_dept=g.scodoc_dept, + etape_apo=t["etape_apo"], + semset_id=semset["semset_id"], ) columns_ids = ["filename", "etape_apo", "date_str", "nb_etuds"] @@ -504,13 +512,16 @@ def view_apo_etuds(semset_id, title="", nip_list="", format="html"): for etud in etuds.values(): etud_sco = sco_etud.get_etud_info(code_nip=etud["nip"], filled=True) if etud_sco: - e = etud_sco[0] etud["inscriptions_scodoc"] = ", ".join( [ - '{s[etapes_apo_str]} (S{s[semestre_id]})'.format( - s=sem, e=e - ) - for sem in e["sems"] + f"""{sem["etapes_apo_str"]} (S{sem["semestre_id"]}) + """ + for sem in etud_sco[0]["sems"] ] ) @@ -534,8 +545,8 @@ def view_scodoc_etuds(semset_id, title="", nip_list="", format="html"): tgt = url_for("scolar.ficheEtud", scodoc_dept=g.scodoc_dept, etudid=e["etudid"]) e["_nom_target"] = tgt e["_prenom_target"] = tgt - e["_nom_td_attrs"] = 'id="%s" class="etudinfo"' % (e["etudid"],) - e["_prenom_td_attrs"] = 'id="pre-%s" class="etudinfo"' % (e["etudid"],) + e["_nom_td_attrs"] = f"""id="{e['etudid']}" class="etudinfo" """ + e["_prenom_td_attrs"] = f"""id="pre-{e['etudid']}" class="etudinfo" """ return _view_etuds_page( semset_id, @@ -546,20 +557,14 @@ def view_scodoc_etuds(semset_id, title="", nip_list="", format="html"): ) -def _view_etuds_page(semset_id, title="", etuds=[], keys=(), format="html"): +def _view_etuds_page( + semset_id: int, title="", etuds: list = None, keys=(), format="html" +) -> str: + "Affiche les étudiants indiqués" # Tri les étudiants par nom: - if etuds: + if etuds: # XXX TODO modifier pour utiliser clé de tri etuds.sort(key=lambda x: (x["nom"], x["prenom"])) - H = [ - html_sco_header.sco_header( - page_title=title, - init_qtip=True, - javascripts=["js/etud_info.js"], - ), - "

    %s

    " % title, - ] - tab = GenTable( titles={ "nip": "Code NIP", @@ -579,14 +584,23 @@ def _view_etuds_page(semset_id, title="", etuds=[], keys=(), format="html"): if format != "html": return tab.make_page(format=format) - H.append(tab.html()) + return f""" + {html_sco_header.sco_header( + page_title=title, + init_qtip=True, + javascripts=["js/etud_info.js"], + )} +

    {title}

    - H.append( - """

    Retour à la page d'export Apogée""" - % semset_id - ) + {tab.html()} - return "\n".join(H) + html_sco_header.sco_footer() +

    Retour à la page d'export Apogée +

    + {html_sco_header.sco_footer()} + """ def view_apo_csv_store(semset_id="", csvfile=None, data: bytes = "", autodetect=False): @@ -603,7 +617,7 @@ def view_apo_csv_store(semset_id="", csvfile=None, data: bytes = "", autodetect= if autodetect: # check encoding (although documentation states that users SHOULD upload LATIN1) - data, message = sco_apogee_csv.fix_data_encoding(data) + data, message = sco_apogee_reader.fix_data_encoding(data) if message: log(f"view_apo_csv_store: {message}") else: @@ -623,7 +637,7 @@ def view_apo_csv_store(semset_id="", csvfile=None, data: bytes = "", autodetect= f""" Erreur: l'encodage du fichier est mal détecté. Essayez sans auto-détection, ou vérifiez le codage et le contenu - du fichier (qui doit être en {sco_apogee_csv.APO_INPUT_ENCODING}). + du fichier (qui doit être en {sco_apogee_reader.APO_INPUT_ENCODING}). """, dest_url=dest_url, ) from exc @@ -631,7 +645,7 @@ def view_apo_csv_store(semset_id="", csvfile=None, data: bytes = "", autodetect= raise ScoValueError( f""" Erreur: l'encodage du fichier est incorrect. - Vérifiez qu'il est bien en {sco_apogee_csv.APO_INPUT_ENCODING} + Vérifiez qu'il est bien en {sco_apogee_reader.APO_INPUT_ENCODING} """, dest_url=dest_url, ) from exc @@ -640,20 +654,20 @@ def view_apo_csv_store(semset_id="", csvfile=None, data: bytes = "", autodetect= apo_data = sco_apogee_csv.ApoData( data_str, periode=semset["sem_id"] ) # parse le fichier -> exceptions + dest_url = url_for( + "notes.apo_semset_maq_status", + scodoc_dept=g.scodoc_dept, + semset_id=semset_id, + ) if apo_data.etape not in semset["etapes"]: raise ScoValueError( - "Le code étape de ce fichier ne correspond pas à ceux de cet ensemble" + "Le code étape de ce fichier ne correspond pas à ceux de cet ensemble", + dest_url=dest_url, ) sco_etape_apogee.apo_csv_store(data_str, semset["annee_scolaire"], semset["sem_id"]) - return flask.redirect( - url_for( - "notes.apo_semset_maq_status", - scodoc_dept=g.scodoc_dept, - semset_id=semset_id, - ) - ) + return flask.redirect(dest_url) def view_apo_csv_download_and_store(etape_apo="", semset_id=""): @@ -679,9 +693,8 @@ def view_apo_csv_delete(etape_apo="", semset_id="", dialog_confirmed=False): dest_url = f"apo_semset_maq_status?semset_id={semset_id}" if not dialog_confirmed: return scu.confirm_dialog( - """

    Confirmer la suppression du fichier étape %s?

    -

    La suppression sera définitive.

    """ - % (etape_apo,), + f"""

    Confirmer la suppression du fichier étape {etape_apo}?

    +

    La suppression sera définitive.

    """, dest_url="", cancel_url=dest_url, parameters={"semset_id": semset_id, "etape_apo": etape_apo}, @@ -727,24 +740,24 @@ def view_apo_csv(etape_apo="", semset_id="", format="html"): H = [ html_sco_header.sco_header( - page_title="Maquette Apogée enregistrée pour %s" % etape_apo, + page_title=f"""Maquette Apogée enregistrée pour {etape_apo}""", init_qtip=True, javascripts=["js/etud_info.js"], ), - """

    Etudiants dans la maquette Apogée %s

    """ % etape_apo, - """

    Pour l'ensemble %(title)s (indice semestre: %(sem_id)s)

    """ - % semset, + f"""

    Étudiants dans la maquette Apogée {etape_apo}

    +

    Pour l'ensemble {semset['title']} (indice semestre: {semset['sem_id']}) +

    +
    +
    Code étape:{ + apo_data.etape_apogee} VDI {apo_data.vdi_apogee} (année {apo_data.annee_scolaire + }) +
    +
    + """, ] - # Infos générales - H.append( - """ -
    -
    Code étape:{0.etape_apogee} VDI {0.vdi_apogee} (année {0.annee_scolaire})
    -
    - """.format( - apo_data - ) - ) # Liste des étudiants (sans les résultats pour le moment): TODO etuds = apo_data.etuds @@ -789,12 +802,21 @@ def view_apo_csv(etape_apo="", semset_id="", format="html"): return tab.make_page(format=format) H += [ - tab.html(), - """

    fichier maquette CSV brut (non rempli par ScoDoc)

    """ - % (etape_apo, semset_id), - """""" - % semset_id, + f""" + {tab.html()} +

    fichier maquette CSV brut (non rempli par ScoDoc) +

    +
    + Retour +
    + """, html_sco_header.sco_footer(), ] @@ -809,7 +831,7 @@ def apo_csv_export_results( block_export_res_ues=False, block_export_res_modules=False, block_export_res_sdj=False, -): +) -> Response: """Remplit les fichiers CSV archivés et donne un ZIP avec tous les résultats. """ @@ -833,31 +855,28 @@ def apo_csv_export_results( periode = semset["sem_id"] data = io.BytesIO() - dest_zip = ZipFile(data, "w") - - etapes_apo = sco_etape_apogee.apo_csv_list_stored_etapes( - annee_scolaire, periode, etapes=semset.list_etapes() - ) - for etape_apo in etapes_apo: - apo_csv = sco_etape_apogee.apo_csv_get(etape_apo, annee_scolaire, periode) - sco_apogee_csv.export_csv_to_apogee( - apo_csv, - periode=periode, - export_res_etape=export_res_etape, - export_res_sem=export_res_sem, - export_res_ues=export_res_ues, - export_res_modules=export_res_modules, - export_res_sdj=export_res_sdj, - export_res_rat=export_res_rat, - dest_zip=dest_zip, + with ZipFile(data, "w") as dest_zip: + etapes_apo = sco_etape_apogee.apo_csv_list_stored_etapes( + annee_scolaire, periode, etapes=semset.list_etapes() ) + for etape_apo in etapes_apo: + apo_csv = sco_etape_apogee.apo_csv_get(etape_apo, annee_scolaire, periode) + sco_apogee_csv.export_csv_to_apogee( + apo_csv, + periode=periode, + export_res_etape=export_res_etape, + export_res_sem=export_res_sem, + export_res_ues=export_res_ues, + export_res_modules=export_res_modules, + export_res_sdj=export_res_sdj, + export_res_rat=export_res_rat, + dest_zip=dest_zip, + ) - dest_zip.close() data.seek(0) basename = ( sco_preferences.get_preference("DeptName") - + str(annee_scolaire) - + "-%s-" % periode + + f"{annee_scolaire}-{periode}-" + "-".join(etapes_apo) ) basename = scu.unescape_html(basename) diff --git a/app/scodoc/sco_etape_bilan.py b/app/scodoc/sco_etape_bilan.py index dc1a50499..ec1a8c8bf 100644 --- a/app/scodoc/sco_etape_bilan.py +++ b/app/scodoc/sco_etape_bilan.py @@ -174,7 +174,7 @@ class DataEtudiant(object): return self.data_apogee["nom"] + self.data_apogee["prenom"] -def help(): +def _help() -> str: return """
    Explications sur les tableaux des effectifs et liste des étudiants @@ -501,7 +501,7 @@ class EtapeBilan: entete_liste_etudiant(), self.table_effectifs(), """""", - help(), + _help(), ] return "\n".join(H) diff --git a/app/scodoc/sco_etud.py b/app/scodoc/sco_etud.py index 5a2313fae..70b7b1636 100644 --- a/app/scodoc/sco_etud.py +++ b/app/scodoc/sco_etud.py @@ -57,7 +57,12 @@ def format_etud_ident(etud): else: etud["nom_usuel"] = "" etud["prenom"] = format_prenom(etud["prenom"]) + if "prenom_etat_civil" in etud: + etud["prenom_etat_civil"] = format_prenom(etud["prenom_etat_civil"]) + else: + etud["prenom_etat_civil"] = "" etud["civilite_str"] = format_civilite(etud["civilite"]) + etud["civilite_etat_civil_str"] = format_civilite(etud["civilite_etat_civil"]) # Nom à afficher: if etud["nom_usuel"]: etud["nom_disp"] = etud["nom_usuel"] @@ -225,7 +230,12 @@ _identiteEditor = ndb.EditableTable( "nom", "nom_usuel", "prenom", + "prenom_etat_civil", + "cas_id", + "cas_allow_login", + "cas_allow_scodoc_login", "civilite", # 'M", "F", or "X" + "civilite_etat_civil", "date_naissance", "lieu_naissance", "dept_naissance", @@ -242,7 +252,9 @@ _identiteEditor = ndb.EditableTable( input_formators={ "nom": force_uppercase, "prenom": force_uppercase, + "prenom_etat_civil": force_uppercase, "civilite": input_civilite, + "civilite_etat_civil": input_civilite, "date_naissance": ndb.DateDMYtoISO, "boursier": bool, }, @@ -263,6 +275,7 @@ def identite_list(cnx, *a, **kw): else: o["annee_naissance"] = o["date_naissance"] o["civilite_str"] = format_civilite(o["civilite"]) + o["civilite_etat_civil_str"] = format_civilite(o["civilite_etat_civil"]) return objs diff --git a/app/scodoc/sco_exceptions.py b/app/scodoc/sco_exceptions.py index a21869405..4d57293c9 100644 --- a/app/scodoc/sco_exceptions.py +++ b/app/scodoc/sco_exceptions.py @@ -29,6 +29,7 @@ """ from flask_login import current_user + # --- Exceptions class ScoException(Exception): "super classe de toutes les exceptions ScoDoc." @@ -44,6 +45,7 @@ class ScoInvalidCSRF(ScoException): class ScoValueError(ScoException): "Exception avec page d'erreur utilisateur, et qui stoque dest_url" + # mal nommée: super classe de toutes les exceptions avec page # d'erreur gentille. def __init__(self, msg, dest_url=None): @@ -75,7 +77,11 @@ class InvalidEtudId(NoteProcessError): class ScoFormatError(ScoValueError): - pass + "Erreur lecture d'un fichier fourni par l'utilisateur" + + def __init__(self, msg, filename="", dest_url=None): + super().__init__(msg, dest_url=dest_url) + self.filename = filename class ScoInvalidParamError(ScoValueError): diff --git a/app/scodoc/sco_formations.py b/app/scodoc/sco_formations.py index fa169c680..88691b9c2 100644 --- a/app/scodoc/sco_formations.py +++ b/app/scodoc/sco_formations.py @@ -127,7 +127,7 @@ def formation_export_dict( ue_dict["apc_niveau_libelle"] = ue.niveau_competence.libelle ue_dict["apc_niveau_annee"] = ue.niveau_competence.annee ue_dict["apc_niveau_ordre"] = ue.niveau_competence.ordre - # Et le parcour: + # Et les parcours: if ue.parcours: ue_dict["parcours"] = [ parcour.to_dict(with_annees=False) for parcour in ue.parcours @@ -268,8 +268,8 @@ def _formation_retreive_refcomp(f_dict: dict) -> int: def _formation_retreive_apc_niveau( referentiel_competence_id: int, ue_dict: dict ) -> int: - """Recherche dans le ref. de comp. un niveau pour cette UE - utilise comme clé (libelle, annee, ordre) + """Recherche dans le ref. de comp. un niveau pour cette UE. + Utilise (libelle, annee, ordre) comme clé. """ libelle = ue_dict.get("apc_niveau_libelle") annee = ue_dict.get("apc_niveau_annee") @@ -367,6 +367,8 @@ def formation_import_xml(doc: str, import_tags=True, use_local_refcomp=False): assert ue if xml_ue_id: ues_old2new[xml_ue_id] = ue_id + # parcours BUT + # TODO XXX # élément optionnel présent dans les exports BUT: ue_reference = ue_info[1].get("reference") if ue_reference: diff --git a/app/scodoc/sco_import_etuds.py b/app/scodoc/sco_import_etuds.py index 110d95d88..2d67c08cd 100644 --- a/app/scodoc/sco_import_etuds.py +++ b/app/scodoc/sco_import_etuds.py @@ -71,6 +71,8 @@ FORMAT_FILE = "format_import_etudiants.txt" ADMISSION_MODIFIABLE_FIELDS = ( "code_nip", "code_ine", + "prenom_etat_civil", + "civilite_etat_civil", "date_naissance", "lieu_naissance", "bac", diff --git a/app/scodoc/sco_page_etud.py b/app/scodoc/sco_page_etud.py index 6ac2b4bc0..b6a3592d6 100644 --- a/app/scodoc/sco_page_etud.py +++ b/app/scodoc/sco_page_etud.py @@ -176,6 +176,18 @@ def ficheEtud(etudid=None): sco_etud.fill_etuds_info([etud_]) # info = etud_ + if etud.prenom_etat_civil: + info["etat_civil"] = ( + "

    Etat-civil: " + + etud.civilite_etat_civil_str + + " " + + etud.prenom_etat_civil + + " " + + etud.nom + + "

    " + ) + else: + info["etat_civil"] = "" info["ScoURL"] = scu.ScoURL() info["authuser"] = authuser info["info_naissance"] = info["date_naissance"] @@ -325,18 +337,17 @@ def ficheEtud(etudid=None): if not sco_permissions_check.can_suppress_annotation(a["id"]): a["dellink"] = "" else: - a["dellink"] = ( - '%s' - % ( - etudid, - a["id"], - scu.icontag( - "delete_img", - border="0", - alt="suppress", - title="Supprimer cette annotation", - ), - ) + a[ + "dellink" + ] = '%s' % ( + etudid, + a["id"], + scu.icontag( + "delete_img", + border="0", + alt="suppress", + title="Supprimer cette annotation", + ), ) author = sco_users.user_info(a["author"]) alist.append( @@ -473,7 +484,7 @@ def ficheEtud(etudid=None):

    %(nomprenom)s (%(inscription)s)

    - +%(etat_civil)s %(emaillink)s
    %(etudfoto)s diff --git a/app/scodoc/sco_preferences.py b/app/scodoc/sco_preferences.py index 9544bbe91..eefe92756 100644 --- a/app/scodoc/sco_preferences.py +++ b/app/scodoc/sco_preferences.py @@ -111,9 +111,9 @@ get_base_preferences(formsemestre_id) """ import flask -from flask import flash, g, request, url_for -# from flask_login import current_user +from flask import current_app, flash, g, request, url_for + from app.models import Departement from app.scodoc import sco_cache @@ -2234,7 +2234,6 @@ class SemPreferences: raise ScoValueError( "sem_preferences.edit doit etre appele sur un semestre !" ) # a bug ! - sem = sco_formsemestre.get_formsemestre(self.formsemestre_id) H = [ html_sco_header.html_sem_header( "Préférences du semestre", diff --git a/app/scodoc/sco_semset.py b/app/scodoc/sco_semset.py index c91ee7620..1a4e68bbd 100644 --- a/app/scodoc/sco_semset.py +++ b/app/scodoc/sco_semset.py @@ -84,15 +84,17 @@ class SemSet(dict): self.semset_id = semset_id self["semset_id"] = semset_id self.sems = [] - self.formsemestre_ids = [] + self.formsemestres = [] # modernisation en cours... + self.is_apc = False + self.formsemestre_ids = set() cnx = ndb.GetDBConnexion() if semset_id: # read existing set - L = semset_list(cnx, args={"semset_id": semset_id}) - if not L: + semsets = semset_list(cnx, args={"semset_id": semset_id}) + if not semsets: raise ScoValueError(f"Ensemble inexistant ! (semset {semset_id})") - self["title"] = L[0]["title"] - self["annee_scolaire"] = L[0]["annee_scolaire"] - self["sem_id"] = L[0]["sem_id"] + self["title"] = semsets[0]["title"] + self["annee_scolaire"] = semsets[0]["annee_scolaire"] + self["sem_id"] = semsets[0]["sem_id"] r = ndb.SimpleDictFetch( "SELECT formsemestre_id FROM notes_semset_formsemestre WHERE semset_id = %(semset_id)s", {"semset_id": semset_id}, @@ -123,8 +125,13 @@ class SemSet(dict): def load_sems(self): """Load formsemestres""" self.sems = [] + self.formsemestres = [] for formsemestre_id in self.formsemestre_ids: - self.sems.append(sco_formsemestre.get_formsemestre(formsemestre_id)) + formsemestre: FormSemestre = FormSemestre.query.get(formsemestre_id) + self.formsemestres.append(formsemestre) + sem = sco_formsemestre.get_formsemestre(formsemestre_id) + self.sems.append(sem) + self["is_apc"] = formsemestre.formation.is_apc() if self.sems: self["date_debut"] = min([sem["date_debut_iso"] for sem in self.sems]) @@ -137,8 +144,15 @@ class SemSet(dict): self["semtitles"] = [sem["titre_num"] for sem in self.sems] # Construction du ou des lien(s) vers le semestre - pattern = '%(titreannee)s' - self["semlinks"] = [(pattern % sem) for sem in self.sems] + self["semlinks"] = [ + f"""{formsemestre.titre_annee()} + """ + for formsemestre in self.formsemestres + ] + self["semtitles_str"] = "
    ".join(self["semlinks"]) def fill_formsemestres(self): @@ -149,6 +163,8 @@ class SemSet(dict): def add(self, formsemestre_id): "Ajoute ce semestre à l'ensemble" + # check for valid formsemestre_id + formsemestre: FormSemestre = FormSemestre.query.get_or_404(formsemestre_id) # check if formsemestre_id in self.formsemestre_ids: return # already there @@ -159,6 +175,17 @@ class SemSet(dict): f"can't add {formsemestre_id} to set {self.semset_id}: incompatible sem_id" ) + if self.formsemestre_ids and formsemestre.formation.is_apc() != self["is_apc"]: + raise ScoValueError( + """On ne peut pas mélanger des semestres BUT/APC + avec des semestres ordinaires dans le même export.""", + dest_url=url_for( + "notes.apo_semset_maq_status", + scodoc_dept=g.scodoc_dept, + semset_id=self.semset_id, + ), + ) + ndb.SimpleQuery( """INSERT INTO notes_semset_formsemestre (formsemestre_id, semset_id) @@ -242,17 +269,28 @@ class SemSet(dict): def load_etuds(self): self["etuds_without_nip"] = set() # etudids self["jury_ok"] = True + self["jury_nb_missing"] = 0 + is_apc = None for sem in self.sems: formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"]) nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre) + if is_apc is not None and is_apc != nt.is_apc: + raise ScoValueError( + "Incohérence: semestre APC (BUT) et ordinaires mélangés !" + ) + else: + is_apc = nt.is_apc sem["etuds"] = list(nt.identdict.values()) sem["nips"] = {e["code_nip"] for e in sem["etuds"] if e["code_nip"]} sem["etuds_without_nip"] = { e["etudid"] for e in sem["etuds"] if not e["code_nip"] } self["etuds_without_nip"] |= sem["etuds_without_nip"] - sem["jury_ok"] = nt.all_etuds_have_sem_decisions() + sem["etudids_no_jury"] = nt.etudids_without_decisions() + sem["jury_ok"] = not sem["etudids_no_jury"] self["jury_ok"] &= sem["jury_ok"] + self["jury_nb_missing"] += len(sem["etudids_no_jury"]) + self["is_apc"] = bool(is_apc) def html_descr(self): """Short HTML description""" @@ -272,36 +310,21 @@ class SemSet(dict): ) H.append("

    ") - H.append( - f"""

    Période: -

    - - """ - ) + if self["sem_id"] == 1: + periode = "1re période (S1, S3)" + elif self["sem_id"] == 2: + periode = "2de période (S2, S4)" + else: + periode = "non semestrialisée (LP, ...). Incompatible avec BUT." H.append( - f"

    Etapes: {sco_formsemestre.etapes_apo_str(self.list_etapes())}

    " + f""" +

    Période: {periode}

    +

    Etapes: {sco_formsemestre.etapes_apo_str(self.list_etapes())}

    + +

    Semestres de l'ensemble: