forked from ScoDoc/DocScoDoc
1368 lines
50 KiB
Python
1368 lines
50 KiB
Python
# -*- mode: python -*-
|
|
# -*- coding: utf-8 -*-
|
|
|
|
##############################################################################
|
|
#
|
|
# Gestion scolarite IUT
|
|
#
|
|
# Copyright (c) 1999 - 2022 Emmanuel Viennet. All rights reserved.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Emmanuel Viennet emmanuel.viennet@viennet.net
|
|
#
|
|
##############################################################################
|
|
|
|
"""Exportation des résultats des étudiants vers Apogée.
|
|
|
|
Ce code a été au départ inspiré par les travaux de Damien Mascré, scodoc2apogee (en Java).
|
|
|
|
A utiliser en fin de semestre, après les jury.
|
|
|
|
On communique avec Apogée via des fichiers CSV.
|
|
|
|
Le fichier CSV, champs séparés par des tabulations, a la structure suivante:
|
|
|
|
<pre>
|
|
XX-APO_TITRES-XX
|
|
apoC_annee 2007/2008
|
|
apoC_cod_dip VDTCJ
|
|
apoC_Cod_Exp 1
|
|
apoC_cod_vdi 111
|
|
apoC_Fichier_Exp VDTCJ_V1CJ.txt
|
|
apoC_lib_dip DUT CJ
|
|
apoC_Titre1 Export Apogée du 13/06/2008 à 14:29
|
|
apoC_Titre2
|
|
|
|
XX-APO_COLONNES-XX
|
|
apoL_a01_code Type Objet Code Version Année Session Admission/Admissibilité Type Rés. Etudiant Numéro
|
|
apoL_a02_nom 1 Nom
|
|
apoL_a03_prenom 1 Prénom
|
|
apoL_a04_naissance Session Admissibilité Naissance
|
|
APO_COL_VAL_DEB
|
|
apoL_c0001 VET V1CJ 111 2007 0 1 N V1CJ - DUT CJ an1 0 1 Note
|
|
apoL_c0002 VET V1CJ 111 2007 0 1 B 0 1 Barème
|
|
apoL_c0003 VET V1CJ 111 2007 0 1 R 0 1 Résultat
|
|
APO_COL_VAL_FIN
|
|
apoL_c0030 APO_COL_VAL_FIN
|
|
|
|
XX-APO_VALEURS-XX
|
|
apoL_a01_code apoL_a02_nom apoL_a03_prenom apoL_a04_naissance apoL_c0001 apoL_c0002 apoL_c0003 apoL_c0004 apoL_c0005 apoL_c0006 apoL_c0007 apoL_c0008 apoL_c0009 apoL_c0010 apoL_c0011 apoL_c0012 apoL_c0013 apoL_c0014 apoL_c0015 apoL_c0016 apoL_c0017 apoL_c0018 apoL_c0019 apoL_c0020 apoL_c0021 apoL_c0022 apoL_c0023 apoL_c0024 apoL_c0025 apoL_c0026 apoL_c0027 apoL_c0028 apoL_c0029
|
|
10601232 AARIF MALIKA 22/09/1986 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM 18 20 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM
|
|
</pre>
|
|
|
|
|
|
On récupère nos éléments pédagogiques dans la section XX-APO-COLONNES-XX et
|
|
notre liste d'étudiants dans la section XX-APO_VALEURS-XX. Les champs de la
|
|
section XX-APO_VALEURS-XX sont décrits par les lignes successives de la
|
|
section XX-APO_COLONNES-XX.
|
|
|
|
Le fichier CSV correspond à une étape, qui est récupérée sur la ligne
|
|
<pre>
|
|
apoL_c0001 VET V1CJ ...
|
|
</pre>
|
|
|
|
|
|
XXX A vérifier:
|
|
AJAC car 1 sem. validé et pas de NAR
|
|
|
|
"""
|
|
|
|
import collections
|
|
import datetime
|
|
from functools import reduce
|
|
import functools
|
|
import io
|
|
import os
|
|
import pprint
|
|
import re
|
|
import time
|
|
from zipfile import ZipFile
|
|
|
|
from flask import send_file
|
|
import numpy as np
|
|
|
|
# Pour la détection auto de l'encodage des fichiers Apogée:
|
|
from chardet import detect as chardet_detect
|
|
|
|
from app import log
|
|
from app.comp import res_sem
|
|
from app.comp.res_compat import NotesTableCompat
|
|
from app.models import FormSemestre, Identite
|
|
from app.models.config import ScoDocSiteConfig
|
|
import app.scodoc.sco_utils as scu
|
|
from app.scodoc.sco_exceptions import ScoValueError, ScoFormatError
|
|
from app.scodoc.gen_tables import GenTable
|
|
from app.scodoc.sco_vdi import ApoEtapeVDI
|
|
from app.scodoc.sco_codes_parcours import code_semestre_validant
|
|
from app.scodoc.sco_codes_parcours import (
|
|
DEF,
|
|
DEM,
|
|
NAR,
|
|
RAT,
|
|
)
|
|
from app.scodoc import sco_cursus
|
|
from app.scodoc import sco_formsemestre
|
|
from app.scodoc import sco_etud
|
|
|
|
APO_PORTAL_ENCODING = (
|
|
"utf8" # encodage du fichier CSV Apogée (était 'ISO-8859-1' avant jul. 2016)
|
|
)
|
|
APO_INPUT_ENCODING = "ISO-8859-1" #
|
|
APO_OUTPUT_ENCODING = APO_INPUT_ENCODING # encodage des fichiers Apogee générés
|
|
APO_DECIMAL_SEP = "," # separateur décimal: virgule
|
|
APO_SEP = "\t"
|
|
APO_NEWLINE = "\r\n"
|
|
|
|
|
|
def _apo_fmt_note(note, fmt="%3.2f"):
|
|
"Formatte une note pour Apogée (séparateur décimal: ',')"
|
|
# if not note and isinstance(note, float): changé le 31/1/2022, étrange ?
|
|
# return ""
|
|
try:
|
|
val = float(note)
|
|
except ValueError:
|
|
return ""
|
|
if np.isnan(val):
|
|
return ""
|
|
return (fmt % val).replace(".", APO_DECIMAL_SEP)
|
|
|
|
|
|
def guess_data_encoding(text, threshold=0.6):
|
|
"""Guess string encoding, using chardet heuristics.
|
|
Returns encoding, or None if detection failed (confidence below threshold)
|
|
"""
|
|
r = chardet_detect(text)
|
|
if r["confidence"] < threshold:
|
|
return None
|
|
else:
|
|
return r["encoding"]
|
|
|
|
|
|
def fix_data_encoding(
|
|
text: bytes,
|
|
default_source_encoding=APO_INPUT_ENCODING,
|
|
dest_encoding=APO_INPUT_ENCODING,
|
|
) -> bytes:
|
|
"""Try to ensure that text is using dest_encoding
|
|
returns converted text, and a message describing the conversion.
|
|
"""
|
|
message = ""
|
|
detected_encoding = guess_data_encoding(text)
|
|
if not detected_encoding:
|
|
if default_source_encoding != dest_encoding:
|
|
message = "converting from %s to %s" % (
|
|
default_source_encoding,
|
|
dest_encoding,
|
|
)
|
|
text = text.decode(default_source_encoding).encode(
|
|
dest_encoding
|
|
) # XXX #py3 #sco8 à tester
|
|
else:
|
|
if detected_encoding != dest_encoding:
|
|
message = "converting from detected %s to %s" % (
|
|
detected_encoding,
|
|
dest_encoding,
|
|
)
|
|
text = text.decode(detected_encoding).encode(dest_encoding) # XXX
|
|
return text, message
|
|
|
|
|
|
class StringIOFileLineWrapper(object):
|
|
def __init__(self, data: str):
|
|
self.f = io.StringIO(data)
|
|
self.lineno = 0
|
|
|
|
def close(self):
|
|
return self.f.close()
|
|
|
|
def readline(self):
|
|
self.lineno += 1
|
|
return self.f.readline()
|
|
|
|
|
|
class DictCol(dict):
|
|
"A dict, where we can add attributes"
|
|
pass
|
|
|
|
|
|
class ApoElt(object):
|
|
"""Definition d'un Element Apogee
|
|
sur plusieurs colonnes du fichier CSV
|
|
"""
|
|
|
|
def __init__(self, cols):
|
|
assert len(cols) > 0
|
|
assert len(set([c["Code"] for c in cols])) == 1 # colonnes de meme code
|
|
assert len(set([c["Type Objet"] for c in cols])) == 1 # colonnes de meme type
|
|
self.cols = cols
|
|
self.code = cols[0]["Code"]
|
|
self.version = cols[0]["Version"]
|
|
self.type_objet = cols[0]["Type Objet"]
|
|
|
|
def append(self, col):
|
|
assert col["Code"] == self.code
|
|
if col["Type Objet"] != self.type_objet:
|
|
log(
|
|
"Warning: ApoElt: duplicate id %s (%s and %s)"
|
|
% (self.code, self.type_objet, col["Type Objet"])
|
|
)
|
|
self.type_objet = col["Type Objet"]
|
|
self.cols.append(col)
|
|
|
|
def __repr__(self):
|
|
return "ApoElt(code='%s', cols=%s)" % (self.code, pprint.pformat(self.cols))
|
|
|
|
|
|
class EtuCol(object):
|
|
"""Valeurs colonnes d'un element pour un etudiant"""
|
|
|
|
def __init__(self, nip, apo_elt, init_vals):
|
|
pass # XXX
|
|
|
|
|
|
ETUD_OK = "ok"
|
|
ETUD_ORPHELIN = "orphelin"
|
|
ETUD_NON_INSCRIT = "non_inscrit"
|
|
|
|
VOID_APO_RES = dict(N="", B="", J="", R="", M="")
|
|
|
|
|
|
class ApoEtud(dict):
|
|
"""Étudiant Apogee:"""
|
|
|
|
def __init__(
|
|
self,
|
|
nip="",
|
|
nom="",
|
|
prenom="",
|
|
naissance="",
|
|
cols={},
|
|
export_res_etape=True,
|
|
export_res_sem=True,
|
|
export_res_ues=True,
|
|
export_res_modules=True,
|
|
export_res_sdj=True,
|
|
export_res_rat=True,
|
|
):
|
|
self["nip"] = nip
|
|
self["nom"] = nom
|
|
self["prenom"] = prenom
|
|
self["naissance"] = naissance
|
|
self.cols = cols
|
|
"{ col_id : value } colid = 'apoL_c0001'"
|
|
self.is_apc = None
|
|
"Vrai si BUT"
|
|
self.col_elts = {}
|
|
"{'V1RT': {'R': 'ADM', 'J': '', 'B': 20, 'N': '12.14'}}"
|
|
self.new_cols = {} # { col_id : value to record in csv }
|
|
self.etud: Identite = None
|
|
"etudiant ScoDoc associé"
|
|
self.etat = None # ETUD_OK, ...
|
|
self.is_NAR = False
|
|
"True si NARé dans un semestre"
|
|
self.log = []
|
|
self.has_logged_no_decision = False
|
|
self.export_res_etape = export_res_etape # VET, ...
|
|
self.export_res_sem = export_res_sem # elt_sem_apo
|
|
self.export_res_ues = export_res_ues
|
|
self.export_res_modules = export_res_modules
|
|
self.export_res_sdj = export_res_sdj # export meme si pas de decision de jury
|
|
self.export_res_rat = export_res_rat
|
|
self.fmt_note = functools.partial(
|
|
_apo_fmt_note, fmt=ScoDocSiteConfig.get_code_apo("NOTES_FMT") or "3.2f"
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"""ApoEtud( nom='{self["nom"]}', nip='{self["nip"]}' )"""
|
|
|
|
def lookup_scodoc(self, etape_formsemestre_ids):
|
|
"""Cherche l'étudiant ScoDoc associé à cet étudiant Apogée.
|
|
S'il n'est pas trouvé (état "orphelin", dans Apo mais pas chez nous),
|
|
met .etud à None.
|
|
Sinon, cherche le semestre, et met l'état à ETUD_OK ou ETUD_NON_INSCRIT.
|
|
"""
|
|
|
|
# futur: #WIP
|
|
# etud: Identite = Identite.query.filter_by(code_nip=self["nip"]).first()
|
|
# self.etud = etud
|
|
etuds = sco_etud.get_etud_info(code_nip=self["nip"], filled=True)
|
|
if not etuds:
|
|
# pas dans ScoDoc
|
|
self.etud = None
|
|
self.log.append("non inscrit dans ScoDoc")
|
|
self.etat = ETUD_ORPHELIN
|
|
else:
|
|
# futur: #WIP
|
|
# formsemestre_ids = {
|
|
# ins.formsemestre_id for ins in etud.formsemestre_inscriptions
|
|
# }
|
|
# in_formsemestre_ids = formsemestre_ids.intersection(etape_formsemestre_ids)
|
|
self.etud = etuds[0]
|
|
# cherche le semestre ScoDoc correspondant à l'un de ceux de l'etape:
|
|
formsemestre_ids = {s["formsemestre_id"] for s in self.etud["sems"]}
|
|
in_formsemestre_ids = formsemestre_ids.intersection(etape_formsemestre_ids)
|
|
if not in_formsemestre_ids:
|
|
self.log.append(
|
|
"connu dans ScoDoc, mais pas inscrit dans un semestre de cette étape"
|
|
)
|
|
self.etat = ETUD_NON_INSCRIT
|
|
else:
|
|
self.etat = ETUD_OK
|
|
|
|
def associate_sco(self, apo_data: "ApoData"):
|
|
"""Recherche les valeurs des éléments Apogée pour cet étudiant
|
|
Set .new_cols
|
|
"""
|
|
self.col_elts = {} # {'V1RT': {'R': 'ADM', 'J': '', 'B': 20, 'N': '12.14'}}
|
|
if self.etat is None:
|
|
self.lookup_scodoc(apo_data.etape_formsemestre_ids)
|
|
if self.etat != ETUD_OK:
|
|
self.new_cols = (
|
|
self.cols
|
|
) # etudiant inconnu, recopie les valeurs existantes dans Apo
|
|
else:
|
|
sco_elts = {} # valeurs trouvées dans ScoDoc code : { N, B, J, R }
|
|
for col_id in apo_data.col_ids[4:]:
|
|
code = apo_data.cols[col_id]["Code"] # 'V1RT'
|
|
el = sco_elts.get(
|
|
code, None
|
|
) # {'R': ADM, 'J': '', 'B': 20, 'N': '12.14'}
|
|
if el is None: # pas déjà trouvé
|
|
cur_sem, autre_sem = self.etud_semestres_de_etape(apo_data)
|
|
for sem in apo_data.sems_etape:
|
|
el = self.search_elt_in_sem(code, sem, cur_sem, autre_sem)
|
|
if el is not None:
|
|
sco_elts[code] = el
|
|
break
|
|
self.col_elts[code] = el
|
|
if el is None:
|
|
self.new_cols[col_id] = self.cols[col_id]
|
|
else:
|
|
try:
|
|
self.new_cols[col_id] = sco_elts[code][
|
|
apo_data.cols[col_id]["Type Rés."]
|
|
]
|
|
except KeyError as exc:
|
|
log(
|
|
f"associate_sco: missing key, etud={self}\ncode='{code}'\netape='{apo_data.etape_apogee}'"
|
|
)
|
|
raise ScoValueError(
|
|
f"""L'élément {code} n'a pas de résultat: peut-être une erreur
|
|
dans les codes sur le programme pédagogique
|
|
(vérifier qu'il est bien associé à une UE ou semestre)?"""
|
|
) from exc
|
|
# recopie les 4 premieres colonnes (nom, ..., naissance):
|
|
for col_id in apo_data.col_ids[:4]:
|
|
self.new_cols[col_id] = self.cols[col_id]
|
|
|
|
# def unassociated_codes(self, apo_data):
|
|
# "list of apo elements for this student without a value in ScoDoc"
|
|
# codes = set([apo_data.cols[col_id].code for col_id in apo_data.col_ids])
|
|
# return codes - set(sco_elts)
|
|
|
|
def search_elt_in_sem(self, code, sem, cur_sem, autre_sem) -> dict:
|
|
"""
|
|
VET code jury etape
|
|
ELP élément pédagogique: UE, module
|
|
Autres éléments: résultats du semestre ou de l'année scolaire:
|
|
=> VRTW1: code additionnel au semestre ("code élement semestre", elt_sem_apo)
|
|
=> VRT1A: le même que le VET: ("code élement annuel", elt_annee_apo)
|
|
Attention, si le semestre couvre plusieurs étapes, indiquer les codes des éléments,
|
|
séparés par des virgules.
|
|
|
|
Args:
|
|
code (str): code apo de l'element cherché
|
|
sem (dict): semestre dans lequel on cherche l'élément
|
|
cur_sem (dict): semestre "courant" pour résultats annuels (VET)
|
|
autre_sem (dict): autre semestre utilisé pour calculé les résultats annuels (VET)
|
|
|
|
Returns:
|
|
dict: with N, B, J, R keys, ou None si elt non trouvé
|
|
"""
|
|
etudid = self.etud["etudid"]
|
|
formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"])
|
|
nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre)
|
|
|
|
if etudid not in nt.identdict:
|
|
return None # etudiant non inscrit dans ce semestre
|
|
|
|
decision = nt.get_etud_decision_sem(etudid)
|
|
if not self.export_res_sdj and not decision:
|
|
# pas de decision de jury, on n'enregistre rien
|
|
# (meme si démissionnaire)
|
|
if not self.has_logged_no_decision:
|
|
self.log.append("Pas de decision")
|
|
self.has_logged_no_decision = True
|
|
return VOID_APO_RES
|
|
|
|
if decision and decision["code"] == NAR:
|
|
self.is_NAR = True
|
|
|
|
# Element etape (annuel ou non):
|
|
if sco_formsemestre.sem_has_etape(sem, code) or (
|
|
code in {x.strip() for x in sem["elt_annee_apo"].split(",")}
|
|
):
|
|
export_res_etape = self.export_res_etape
|
|
if (not export_res_etape) and cur_sem:
|
|
# exporte toujours le résultat de l'étape si l'étudiant est diplômé
|
|
Se = sco_cursus.get_situation_etud_cursus(
|
|
self.etud, cur_sem["formsemestre_id"]
|
|
)
|
|
export_res_etape = Se.all_other_validated()
|
|
|
|
if export_res_etape:
|
|
return self.comp_elt_annuel(etudid, cur_sem, autre_sem)
|
|
else:
|
|
return VOID_APO_RES
|
|
|
|
# Element semestre:
|
|
if code in {x.strip() for x in sem["elt_sem_apo"].split(",")}:
|
|
if self.export_res_sem:
|
|
return self.comp_elt_semestre(nt, decision, etudid)
|
|
else:
|
|
return VOID_APO_RES
|
|
|
|
# Elements UE
|
|
decisions_ue = nt.get_etud_decision_ues(etudid)
|
|
for ue in nt.get_ues_stat_dict():
|
|
if ue["code_apogee"] and code in {
|
|
x.strip() for x in ue["code_apogee"].split(",")
|
|
}:
|
|
if self.export_res_ues:
|
|
if decisions_ue and ue["ue_id"] in decisions_ue:
|
|
ue_status = nt.get_etud_ue_status(etudid, ue["ue_id"])
|
|
code_decision_ue = decisions_ue[ue["ue_id"]]["code"]
|
|
return dict(
|
|
N=self.fmt_note(ue_status["moy"] if ue_status else ""),
|
|
B=20,
|
|
J="",
|
|
R=ScoDocSiteConfig.get_code_apo(code_decision_ue),
|
|
M="",
|
|
)
|
|
else:
|
|
return VOID_APO_RES
|
|
else:
|
|
return VOID_APO_RES
|
|
|
|
# Elements Modules
|
|
modimpls = nt.get_modimpls_dict()
|
|
module_code_found = False
|
|
for modimpl in modimpls:
|
|
module = modimpl["module"]
|
|
if module["code_apogee"] and code in {
|
|
x.strip() for x in module["code_apogee"].split(",")
|
|
}:
|
|
n = nt.get_etud_mod_moy(modimpl["moduleimpl_id"], etudid)
|
|
if n != "NI" and self.export_res_modules:
|
|
return dict(N=self.fmt_note(n), B=20, J="", R="")
|
|
else:
|
|
module_code_found = True
|
|
if module_code_found:
|
|
return VOID_APO_RES
|
|
#
|
|
return None # element Apogee non trouvé dans ce semestre
|
|
|
|
def comp_elt_semestre(self, nt, decision, etudid):
|
|
"""Calcul résultat apo semestre"""
|
|
if self.is_apc:
|
|
# pas de code semestre en APC !
|
|
return dict(N="", B=20, J="", R="", M="")
|
|
if decision is None:
|
|
etud = Identite.query.get(etudid)
|
|
nomprenom = etud.nomprenom if etud else "(inconnu)"
|
|
raise ScoValueError(
|
|
f"decision absente pour l'étudiant {nomprenom} ({etudid})"
|
|
)
|
|
# resultat du semestre
|
|
decision_apo = ScoDocSiteConfig.get_code_apo(decision["code"])
|
|
note = nt.get_etud_moy_gen(etudid)
|
|
if decision_apo == "DEF" or decision["code"] == DEM or decision["code"] == DEF:
|
|
note_str = "0,01" # note non nulle pour les démissionnaires
|
|
else:
|
|
note_str = self.fmt_note(note)
|
|
return dict(N=note_str, B=20, J="", R=decision_apo, M="")
|
|
|
|
def comp_elt_annuel(self, etudid, cur_sem, autre_sem):
|
|
"""Calcul resultat annuel (VET) à partir du semestre courant
|
|
et de l'autre (le suivant ou le précédent complétant l'année scolaire)
|
|
"""
|
|
# Code annuel:
|
|
# - Note: moyenne des moyennes générales des deux semestres (pas vraiment de sens, mais faute de mieux)
|
|
# on pourrait aussi bien prendre seulement la note du dernier semestre (S2 ou S4). Paramétrable ?
|
|
# - Résultat jury:
|
|
# si l'autre est validé, code du semestre courant (ex: S1 (ADM), S2 (AJ) => année AJ)
|
|
# si l'autre n'est pas validé ou est DEF ou DEM, code de l'autre
|
|
#
|
|
# XXX cette règle est discutable, à valider
|
|
|
|
# print 'comp_elt_annuel cur_sem=%s autre_sem=%s' % (cur_sem['formsemestre_id'], autre_sem['formsemestre_id'])
|
|
if not cur_sem:
|
|
# l'étudiant n'a pas de semestre courant ?!
|
|
log("comp_elt_annuel: etudid %s has no cur_sem" % etudid)
|
|
return VOID_APO_RES
|
|
cur_formsemestre = FormSemestre.query.get_or_404(cur_sem["formsemestre_id"])
|
|
cur_nt: NotesTableCompat = res_sem.load_formsemestre_results(cur_formsemestre)
|
|
cur_decision = cur_nt.get_etud_decision_sem(etudid)
|
|
if not cur_decision:
|
|
# pas de decision => pas de résultat annuel
|
|
return VOID_APO_RES
|
|
|
|
if (cur_decision["code"] == RAT) and not self.export_res_rat:
|
|
# ne touche pas aux RATs
|
|
return VOID_APO_RES
|
|
|
|
if not autre_sem:
|
|
# formations monosemestre, ou code VET semestriel,
|
|
# ou jury intermediaire et etudiant non redoublant...
|
|
return self.comp_elt_semestre(cur_nt, cur_decision, etudid)
|
|
|
|
decision_apo = ScoDocSiteConfig.get_code_apo(cur_decision["code"])
|
|
|
|
autre_formsemestre = FormSemestre.query.get_or_404(autre_sem["formsemestre_id"])
|
|
autre_nt: NotesTableCompat = res_sem.load_formsemestre_results(
|
|
autre_formsemestre
|
|
)
|
|
autre_decision = autre_nt.get_etud_decision_sem(etudid)
|
|
if not autre_decision:
|
|
# pas de decision dans l'autre => pas de résultat annuel
|
|
return VOID_APO_RES
|
|
autre_decision_apo = ScoDocSiteConfig.get_code_apo(autre_decision["code"])
|
|
if (
|
|
autre_decision_apo == "DEF"
|
|
or autre_decision["code"] == DEM
|
|
or autre_decision["code"] == DEF
|
|
) or (
|
|
decision_apo == "DEF"
|
|
or cur_decision["code"] == DEM
|
|
or cur_decision["code"] == DEF
|
|
):
|
|
note_str = "0,01" # note non nulle pour les démissionnaires
|
|
else:
|
|
note = cur_nt.get_etud_moy_gen(etudid)
|
|
autre_note = autre_nt.get_etud_moy_gen(etudid)
|
|
# print 'note=%s autre_note=%s' % (note, autre_note)
|
|
try:
|
|
moy_annuelle = (note + autre_note) / 2
|
|
except TypeError:
|
|
moy_annuelle = ""
|
|
note_str = self.fmt_note(moy_annuelle)
|
|
|
|
if code_semestre_validant(autre_decision["code"]):
|
|
decision_apo_annuelle = decision_apo
|
|
else:
|
|
decision_apo_annuelle = autre_decision_apo
|
|
|
|
return dict(N=note_str, B=20, J="", R=decision_apo_annuelle, M="")
|
|
|
|
def etud_semestres_de_etape(self, apo_data):
|
|
"""
|
|
Lorsqu'on a une formation semestrialisée mais avec un code étape annuel,
|
|
il faut considérer les deux semestres ((S1,S2) ou (S3,S4)) pour calculer
|
|
le code annuel (VET ou VRT1A (voir elt_annee_apo)).
|
|
|
|
Pour les jurys intermediaires (janvier, S1 ou S3): (S2 ou S4) de la même
|
|
étape lors d'une année précédente ?
|
|
|
|
Renvoie le semestre "courant" et l'autre semestre, ou None s'il n'y en a pas.
|
|
"""
|
|
# Cherche le semestre "courant":
|
|
cur_sems = [
|
|
sem
|
|
for sem in self.etud["sems"]
|
|
if (
|
|
(sem["semestre_id"] == apo_data.cur_semestre_id)
|
|
and (apo_data.etape in sem["etapes"])
|
|
and (
|
|
# sco_formsemestre.sem_in_annee_scolaire(sem, apo_data.annee_scolaire) # TODO à remplacer par ?
|
|
sco_formsemestre.sem_in_semestre_scolaire(
|
|
sem,
|
|
apo_data.annee_scolaire,
|
|
0,
|
|
# jour_pivot_annee,
|
|
# mois_pivot_annee,
|
|
# jour_pivot_periode,
|
|
# mois_pivot_periode
|
|
)
|
|
)
|
|
)
|
|
]
|
|
if not cur_sems:
|
|
cur_sem = None
|
|
else:
|
|
# prend le plus recent avec decision
|
|
cur_sem = None
|
|
for sem in cur_sems:
|
|
formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"])
|
|
nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre)
|
|
decision = nt.get_etud_decision_sem(self.etud["etudid"])
|
|
if decision:
|
|
cur_sem = sem
|
|
break
|
|
if cur_sem is None:
|
|
cur_sem = cur_sems[0] # aucun avec decison, prend le plus recent
|
|
|
|
if apo_data.cur_semestre_id <= 0:
|
|
return (
|
|
cur_sem,
|
|
None,
|
|
) # "autre_sem" non pertinent pour sessions sans semestres
|
|
|
|
if apo_data.jury_intermediaire: # jury de janvier
|
|
# Le semestre suivant: exemple 2 si on est en jury de S1
|
|
autre_semestre_id = apo_data.cur_semestre_id + 1
|
|
else:
|
|
# Le précédent (S1 si on est en S2)
|
|
autre_semestre_id = apo_data.cur_semestre_id - 1
|
|
|
|
# L'autre semestre DOIT être antérieur au courant indiqué par apo_data
|
|
if apo_data.periode is not None:
|
|
if apo_data.periode == 1:
|
|
courant_annee_debut = apo_data.annee_scolaire
|
|
courant_mois_debut = 9 # periode = 1 (sept-jan)
|
|
elif apo_data.periode == 2:
|
|
courant_annee_debut = apo_data.annee_scolaire + 1
|
|
courant_mois_debut = 1 # ou 2 (fev-jul)
|
|
else:
|
|
raise ValueError("invalid pediode value !") # bug ?
|
|
courant_date_debut = "%d-%02d-01" % (
|
|
courant_annee_debut,
|
|
courant_mois_debut,
|
|
)
|
|
else:
|
|
courant_date_debut = "9999-99-99"
|
|
|
|
# etud['sems'] est la liste des semestres de l'étudiant, triés par date,
|
|
# le plus récemment effectué en tête.
|
|
# Cherche les semestres (antérieurs) de l'indice autre de la même étape apogée
|
|
# s'il y en a plusieurs, choisit le plus récent ayant une décision
|
|
|
|
autres_sems = []
|
|
for sem in self.etud["sems"]:
|
|
if (
|
|
sem["semestre_id"] == autre_semestre_id
|
|
and apo_data.etape_apogee in sem["etapes"]
|
|
):
|
|
if (
|
|
sem["date_debut_iso"] < courant_date_debut
|
|
): # on demande juste qu'il ait démarré avant
|
|
autres_sems.append(sem)
|
|
if not autres_sems:
|
|
autre_sem = None
|
|
elif len(autres_sems) == 1:
|
|
autre_sem = autres_sems[0]
|
|
else:
|
|
autre_sem = None
|
|
for sem in autres_sems:
|
|
formsemestre = FormSemestre.query.get_or_404(sem["formsemestre_id"])
|
|
nt: NotesTableCompat = res_sem.load_formsemestre_results(formsemestre)
|
|
decision = nt.get_etud_decision_sem(self.etud["etudid"])
|
|
if decision:
|
|
autre_sem = sem
|
|
break
|
|
if autre_sem is None:
|
|
autre_sem = autres_sems[0] # aucun avec decision, prend le plus recent
|
|
|
|
return cur_sem, autre_sem
|
|
|
|
|
|
class ApoData(object):
|
|
def __init__(
|
|
self,
|
|
data: str,
|
|
periode=None,
|
|
export_res_etape=True,
|
|
export_res_sem=True,
|
|
export_res_ues=True,
|
|
export_res_modules=True,
|
|
export_res_sdj=True,
|
|
export_res_rat=True,
|
|
orig_filename=None,
|
|
):
|
|
"""Lecture du fichier CSV Apogée
|
|
Regroupe les élements importants d'un fichier CSV Apogée
|
|
periode = 1 (sept-jan) ou 2 (fev-jul), mais cette info n'est pas
|
|
(toujours) présente dans les CSV Apogée et doit être indiquée par l'utilisateur
|
|
Laisser periode à None si etape en 1 semestre (LP, décalés, ...)
|
|
"""
|
|
self.export_res_etape = export_res_etape # VET, ...
|
|
self.export_res_sem = export_res_sem # elt_sem_apo
|
|
self.export_res_ues = export_res_ues
|
|
self.export_res_modules = export_res_modules
|
|
self.export_res_sdj = export_res_sdj
|
|
self.export_res_rat = export_res_rat
|
|
self.orig_filename = orig_filename
|
|
self.periode = periode #
|
|
self.is_apc = None
|
|
"Vrai si BUT"
|
|
try:
|
|
self.read_csv(data)
|
|
except ScoFormatError as e:
|
|
# essaie de retrouver le nom du fichier pour enrichir le message d'erreur
|
|
filename = ""
|
|
if self.orig_filename is None:
|
|
if hasattr(self, "titles"):
|
|
filename = self.titles.get("apoC_Fichier_Exp", filename)
|
|
else:
|
|
filename = self.orig_filename
|
|
raise ScoFormatError(
|
|
"<h3>Erreur lecture du fichier Apogée <tt>%s</tt></h3><p>" % filename
|
|
+ e.args[0]
|
|
+ "</p>"
|
|
) from e
|
|
self.etape_apogee = self.get_etape_apogee() # 'V1RT'
|
|
self.vdi_apogee = self.get_vdi_apogee() # '111'
|
|
self.etape = ApoEtapeVDI(etape=self.etape_apogee, vdi=self.vdi_apogee)
|
|
self.cod_dip_apogee = self.get_cod_dip_apogee()
|
|
self.annee_scolaire = self.get_annee_scolaire()
|
|
self.jury_intermediaire = (
|
|
False # True si jury à mi-étape, eg jury de S1 dans l'étape (S1, S2)
|
|
)
|
|
|
|
log(
|
|
"ApoData( periode=%s, annee_scolaire=%s )"
|
|
% (self.periode, self.annee_scolaire)
|
|
)
|
|
|
|
def set_periode(self, periode): # currently unused
|
|
self.periode = periode
|
|
|
|
def setup(self):
|
|
"""Recherche semestres ScoDoc concernés"""
|
|
self.sems_etape = comp_apo_sems(self.etape_apogee, self.annee_scolaire)
|
|
self.formsemestres_etape = [
|
|
FormSemestre.query.get_or_404(s["formsemestre_id"]) for s in self.sems_etape
|
|
]
|
|
apcs = {
|
|
formsemestre.formation.is_apc() for formsemestre in self.formsemestres_etape
|
|
}
|
|
if len(apcs) != 1:
|
|
raise ScoValueError(
|
|
"l'ensemble mixe des semestres BUT (APC) et des semestres classiques !"
|
|
)
|
|
self.is_apc = apcs.pop()
|
|
self.etape_formsemestre_ids = {s["formsemestre_id"] for s in self.sems_etape}
|
|
if self.periode is not None:
|
|
self.sems_periode = [
|
|
s
|
|
for s in self.sems_etape
|
|
if (s["periode"] == self.periode) or s["semestre_id"] < 0
|
|
]
|
|
if not self.sems_periode:
|
|
log("** Warning: ApoData.setup: sems_periode is empty")
|
|
log(
|
|
"** (periode=%s, sems_etape [periode]=%s)"
|
|
% (self.periode, [s["periode"] for s in self.sems_etape])
|
|
)
|
|
self.sems_periode = None
|
|
self.cur_semestre_id = -1 # ?
|
|
else:
|
|
self.cur_semestre_id = self.sems_periode[0]["semestre_id"]
|
|
# Les semestres de la période ont le même indice, n'est-ce pas ?
|
|
if not all(
|
|
self.cur_semestre_id == s["semestre_id"] for s in self.sems_periode
|
|
):
|
|
# debugging information
|
|
log(
|
|
f"""*** ApoData.set() error !
|
|
ApoData( periode={self.periode}, annee_scolaire={self.annee_scolaire
|
|
}, cur_semestre_id={self.cur_semestre_id} )
|
|
{len(self.sems_periode)} semestres dans la periode:
|
|
"""
|
|
)
|
|
for s in self.sems_periode:
|
|
log(pprint.pformat(s))
|
|
|
|
raise ScoValueError(
|
|
f"""Incohérence détectée !
|
|
|
|
Les semestres de la période n'ont pas tous le même indice.
|
|
|
|
Période: {self.periode}. Indice courant: {self.cur_semestre_id}
|
|
|
|
(au besoin, contacter l'assistance sur {scu.SCO_DISCORD_ASSISTANCE})
|
|
"""
|
|
)
|
|
# Cette condition sera inadaptée si semestres décalés
|
|
# (mais ils n'ont pas d'étape annuelle, espérons!)
|
|
if self.cur_semestre_id >= 0: # non pertinent pour sessions sans semestres
|
|
self.jury_intermediaire = (self.cur_semestre_id % 2) != 0
|
|
else:
|
|
self.sems_periode = None
|
|
|
|
def read_csv(self, data: str):
|
|
if not data:
|
|
raise ScoFormatError("Fichier Apogée vide !")
|
|
f = StringIOFileLineWrapper(data) # pour traiter comme un fichier
|
|
# check that we are at the begining of Apogee CSV
|
|
line = f.readline().strip()
|
|
if line != "XX-APO_TITRES-XX":
|
|
raise ScoFormatError("format incorrect: pas de XX-APO_TITRES-XX")
|
|
|
|
# 1-- En-tête: du début jusqu'à la balise XX-APO_VALEURS-XX
|
|
try:
|
|
idx = data.index("XX-APO_VALEURS-XX")
|
|
except ValueError as exc:
|
|
raise ScoFormatError("format incorrect: pas de XX-APO_VALEURS-XX") from exc
|
|
self.header = data[:idx]
|
|
|
|
# 2-- Titres:
|
|
# on va y chercher apoC_Fichier_Exp qui donnera le nom du fichier
|
|
# ainsi que l'année scolaire et le code diplôme.
|
|
self.titles = _apo_read_TITRES(f)
|
|
|
|
# 3-- La section XX-APO_TYP_RES-XX est ignorée:
|
|
line = f.readline().strip()
|
|
if line != "XX-APO_TYP_RES-XX":
|
|
raise ScoFormatError("format incorrect: pas de XX-APO_TYP_RES-XX")
|
|
_apo_skip_section(f)
|
|
|
|
# 4-- Définition de colonnes: (on y trouve aussi l'étape)
|
|
line = f.readline().strip()
|
|
if line != "XX-APO_COLONNES-XX":
|
|
raise ScoFormatError("format incorrect: pas de XX-APO_COLONNES-XX")
|
|
self.cols = _apo_read_cols(f)
|
|
self.apo_elts = self._group_elt_cols(self.cols)
|
|
|
|
# 5-- Section XX-APO_VALEURS-XX
|
|
# Lecture des étudiants et de leurs résultats
|
|
while True: # skip
|
|
line = f.readline()
|
|
if not line:
|
|
raise ScoFormatError("format incorrect: pas de XX-APO_VALEURS-XX")
|
|
if line.strip() == "XX-APO_VALEURS-XX":
|
|
break
|
|
self.column_titles = f.readline()
|
|
self.col_ids = self.column_titles.strip().split()
|
|
self.etuds = self.apo_read_etuds(f)
|
|
self.etud_by_nip = {e["nip"]: e for e in self.etuds}
|
|
|
|
def get_etud_by_nip(self, nip):
|
|
"returns ApoEtud with a given NIP code"
|
|
return self.etud_by_nip[nip]
|
|
|
|
def _group_elt_cols(self, cols):
|
|
"""Return ordered dict of ApoElt from list of ApoCols.
|
|
Clé: id apogée, eg 'V1RT', 'V1GE2201', ...
|
|
Valeur: ApoElt, avec les attributs code, type_objet
|
|
|
|
Si les id Apogée ne sont pas uniques (ce n'est pas garanti), garde le premier
|
|
"""
|
|
elts = collections.OrderedDict()
|
|
for col_id in sorted(list(cols.keys()), reverse=True):
|
|
col = cols[col_id]
|
|
if col["Code"] in elts:
|
|
elts[col["Code"]].append(col)
|
|
else:
|
|
elts[col["Code"]] = ApoElt([col])
|
|
return elts # { code apo : ApoElt }
|
|
|
|
def apo_read_etuds(self, f) -> list[ApoEtud]:
|
|
"""Lecture des etudiants (et resultats) du fichier CSV Apogée"""
|
|
L = []
|
|
while True:
|
|
line = f.readline()
|
|
if not line:
|
|
break
|
|
if not line.strip():
|
|
continue # silently ignore blank lines
|
|
line = line.strip(APO_NEWLINE)
|
|
fs = line.split(APO_SEP)
|
|
cols = {} # { col_id : value }
|
|
for i in range(len(fs)):
|
|
cols[self.col_ids[i]] = fs[i]
|
|
L.append(
|
|
ApoEtud(
|
|
nip=fs[0], # id etudiant
|
|
nom=fs[1],
|
|
prenom=fs[2],
|
|
naissance=fs[3],
|
|
cols=cols,
|
|
export_res_etape=self.export_res_etape,
|
|
export_res_sem=self.export_res_sem,
|
|
export_res_ues=self.export_res_ues,
|
|
export_res_modules=self.export_res_modules,
|
|
export_res_sdj=self.export_res_sdj,
|
|
export_res_rat=self.export_res_rat,
|
|
)
|
|
)
|
|
|
|
return L
|
|
|
|
def get_etape_apogee(self):
|
|
"""Le code etape: 'V1RT', donné par le code de l'élément VET"""
|
|
for elt in self.apo_elts.values():
|
|
if elt.type_objet == "VET":
|
|
return elt.code
|
|
raise ScoValueError("Pas de code etape Apogee (manque élément VET)")
|
|
|
|
def get_vdi_apogee(self):
|
|
"""le VDI (version de diplôme), stocké dans l'élément VET
|
|
(note: on pourrait peut-être aussi bien le récupérer dans l'en-tête XX-APO_TITRES-XX apoC_cod_vdi)
|
|
"""
|
|
for elt in self.apo_elts.values():
|
|
if elt.type_objet == "VET":
|
|
return elt.version
|
|
raise ScoValueError("Pas de VDI Apogee (manque élément VET)")
|
|
|
|
def get_cod_dip_apogee(self):
|
|
"""Le code diplôme, indiqué dans l'en-tête de la maquette
|
|
exemple: VDTRT
|
|
Retourne '' si absent.
|
|
"""
|
|
return self.titles.get("apoC_cod_dip", "")
|
|
|
|
def get_annee_scolaire(self):
|
|
"""Annee scolaire du fichier Apogee: un integer
|
|
= annee du mois de septembre de début
|
|
"""
|
|
m = re.match("[12][0-9]{3}", self.titles["apoC_annee"])
|
|
if not m:
|
|
raise ScoFormatError(
|
|
'Annee scolaire (apoC_annee) invalide: "%s"' % self.titles["apoC_annee"]
|
|
)
|
|
return int(m.group(0))
|
|
|
|
def write_header(self, f):
|
|
"""write apo CSV header on f
|
|
(beginning of CSV until columns titles just after XX-APO_VALEURS-XX line)
|
|
"""
|
|
f.write(self.header)
|
|
f.write(APO_NEWLINE)
|
|
f.write("XX-APO_VALEURS-XX" + APO_NEWLINE)
|
|
f.write(self.column_titles)
|
|
|
|
def write_etuds(self, f):
|
|
"""write apo CSV etuds on f"""
|
|
for e in self.etuds:
|
|
fs = [] # e['nip'], e['nom'], e['prenom'], e['naissance'] ]
|
|
for col_id in self.col_ids:
|
|
try:
|
|
fs.append(str(e.new_cols[col_id]))
|
|
except KeyError:
|
|
log(
|
|
"Error: %s %s missing column key %s"
|
|
% (e["nip"], e["nom"], col_id)
|
|
)
|
|
log("Details:\ne = %s" % pprint.pformat(e))
|
|
log("col_ids=%s" % pprint.pformat(self.col_ids))
|
|
log("etudiant ignore.\n")
|
|
|
|
f.write(APO_SEP.join(fs) + APO_NEWLINE)
|
|
|
|
def list_unknown_elements(self):
|
|
"""Liste des codes des elements Apogee non trouvés dans ScoDoc
|
|
(après traitement de tous les étudiants)
|
|
"""
|
|
s = set()
|
|
for e in self.etuds:
|
|
ul = [code for code in e.col_elts if e.col_elts[code] is None]
|
|
s.update(ul)
|
|
L = list(s)
|
|
L.sort()
|
|
return L
|
|
|
|
def list_elements(self):
|
|
"""Liste les codes des elements Apogée de la maquette
|
|
et ceux des semestres ScoDoc associés
|
|
Retourne deux ensembles
|
|
"""
|
|
try:
|
|
maq_elems = {self.cols[col_id]["Code"] for col_id in self.col_ids[4:]}
|
|
except KeyError:
|
|
# une colonne déclarée dans l'en-tête n'est pas présente
|
|
declared = self.col_ids[4:] # id des colones dans l'en-tête
|
|
present = sorted(self.cols.keys()) # colones presentes
|
|
log("Fichier Apogee invalide:")
|
|
log("Colonnes declarees: %s" % declared)
|
|
log("Colonnes presentes: %s" % present)
|
|
raise ScoFormatError(
|
|
"""Fichier Apogee invalide<br>Colonnes declarees: <tt>%s</tt>
|
|
<br>Colonnes presentes: <tt>%s</tt>"""
|
|
% (declared, present)
|
|
)
|
|
# l'ensemble de tous les codes des elements apo des semestres:
|
|
sem_elems = reduce(set.union, list(self.get_codes_by_sem().values()), set())
|
|
|
|
return maq_elems, sem_elems
|
|
|
|
def get_codes_by_sem(self):
|
|
"""Pour chaque semestre associé, donne l'ensemble des codes de cette maquette Apogée
|
|
qui s'y trouvent (dans le semestre, les UE ou les modules).
|
|
Return: { formsemestre_id : { 'code1', 'code2', ... }}
|
|
"""
|
|
codes_by_sem = {}
|
|
for sem in self.sems_etape:
|
|
formsemestre: FormSemestre = FormSemestre.query.get_or_404(
|
|
sem["formsemestre_id"]
|
|
)
|
|
# L'ensemble des codes apo associés aux éléments:
|
|
codes_semestre = formsemestre.get_codes_apogee()
|
|
codes_modules = set().union(
|
|
*[
|
|
modimpl.module.get_codes_apogee()
|
|
for modimpl in formsemestre.modimpls
|
|
]
|
|
)
|
|
codes_ues = set().union(
|
|
*[
|
|
ue.get_codes_apogee()
|
|
for ue in formsemestre.query_ues(with_sport=True)
|
|
]
|
|
)
|
|
s = set()
|
|
codes_by_sem[sem["formsemestre_id"]] = s
|
|
for col_id in self.col_ids[4:]:
|
|
code = self.cols[col_id]["Code"] # 'V1RT'
|
|
# associé à l'étape, l'année ou le semestre:
|
|
if code in codes_semestre:
|
|
s.add(code)
|
|
continue
|
|
# associé à une UE:
|
|
if code in codes_ues:
|
|
s.add(code)
|
|
continue
|
|
# associé à un module:
|
|
if code in codes_modules:
|
|
s.add(code)
|
|
# log('codes_by_sem=%s' % pprint.pformat(codes_by_sem))
|
|
return codes_by_sem
|
|
|
|
def build_cr_table(self):
|
|
"""Table compte rendu des décisions"""
|
|
CR = [] # tableau compte rendu des decisions
|
|
for e in self.etuds:
|
|
cr = {
|
|
"NIP": e["nip"],
|
|
"nom": e["nom"],
|
|
"prenom": e["prenom"],
|
|
"est_NAR": e.is_NAR,
|
|
"commentaire": "; ".join(e.log),
|
|
}
|
|
if e.col_elts and e.col_elts[self.etape_apogee] != None:
|
|
cr["etape"] = e.col_elts[self.etape_apogee].get("R", "")
|
|
cr["etape_note"] = e.col_elts[self.etape_apogee].get("N", "")
|
|
else:
|
|
cr["etape"] = ""
|
|
cr["etape_note"] = ""
|
|
CR.append(cr)
|
|
|
|
columns_ids = ["NIP", "nom", "prenom"]
|
|
columns_ids.extend(("etape", "etape_note", "est_NAR", "commentaire"))
|
|
|
|
T = GenTable(
|
|
columns_ids=columns_ids,
|
|
titles=dict(zip(columns_ids, columns_ids)),
|
|
rows=CR,
|
|
xls_sheet_name="Decisions ScoDoc",
|
|
)
|
|
return T
|
|
|
|
|
|
def _apo_read_cols(f):
|
|
"""Lecture colonnes apo :
|
|
Démarre après la balise XX-APO_COLONNES-XX
|
|
et s'arrête après la balise APO_COL_VAL_FIN
|
|
|
|
Colonne Apogee: les champs sont données par la ligne
|
|
apoL_a01_code de la section XX-APO_COLONNES-XX
|
|
col_id est apoL_c0001, apoL_c0002, ...
|
|
|
|
:return: { col_id : { title : value } }
|
|
Example: { 'apoL_c0001' : { 'Type Objet' : 'VET', 'Code' : 'V1IN', ... }, ... }
|
|
"""
|
|
line = f.readline().strip(" " + APO_NEWLINE)
|
|
fs = line.split(APO_SEP)
|
|
if fs[0] != "apoL_a01_code":
|
|
raise ScoFormatError("invalid line: %s (expecting apoL_a01_code)" % line)
|
|
col_keys = fs
|
|
|
|
while True: # skip premiere partie (apoL_a02_nom, ...)
|
|
line = f.readline().strip(" " + APO_NEWLINE)
|
|
if line == "APO_COL_VAL_DEB":
|
|
break
|
|
# après APO_COL_VAL_DEB
|
|
cols = {}
|
|
i = 0
|
|
while True:
|
|
line = f.readline().strip(" " + APO_NEWLINE)
|
|
if line == "APO_COL_VAL_FIN":
|
|
break
|
|
i += 1
|
|
fs = line.split(APO_SEP)
|
|
# print fs[0], len(fs)
|
|
# sanity check
|
|
col_id = fs[0] # apoL_c0001, ...
|
|
if col_id in cols:
|
|
raise ScoFormatError("duplicate column definition: %s" % col_id)
|
|
m = re.match(r"^apoL_c([0-9]{4})$", col_id)
|
|
if not m:
|
|
raise ScoFormatError(
|
|
"invalid column id: %s (expecting apoL_c%04d)" % (line, col_id)
|
|
)
|
|
if int(m.group(1)) != i:
|
|
raise ScoFormatError("invalid column id: %s for index %s" % (col_id, i))
|
|
|
|
cols[col_id] = DictCol(list(zip(col_keys, fs)))
|
|
cols[col_id].lineno = f.lineno # for debuging purpose
|
|
|
|
return cols
|
|
|
|
|
|
def _apo_read_TITRES(f):
|
|
"Lecture section TITRES du fichier Apogée, renvoie dict"
|
|
d = {}
|
|
while True:
|
|
line = f.readline().strip(
|
|
" " + APO_NEWLINE
|
|
) # ne retire pas le \t (pour les clés vides)
|
|
if not line.strip(): # stoppe sur ligne pleines de \t
|
|
break
|
|
|
|
fields = line.split(APO_SEP)
|
|
if len(fields) == 2:
|
|
k, v = fields
|
|
else:
|
|
log("Error read CSV: \nline=%s\nfields=%s" % (line, fields))
|
|
log(dir(f))
|
|
raise ScoFormatError(
|
|
"Fichier Apogee incorrect (section titres, %d champs au lieu de 2)"
|
|
% len(fields)
|
|
)
|
|
d[k] = v
|
|
#
|
|
if not d.get("apoC_Fichier_Exp", None):
|
|
raise ScoFormatError("Fichier Apogee incorrect: pas de titre apoC_Fichier_Exp")
|
|
# keep only basename: may be a windows or unix pathname
|
|
s = d["apoC_Fichier_Exp"].split("/")[-1]
|
|
s = s.split("\\")[-1] # for DOS paths, eg C:\TEMP\VL4RT_V3ASR.TXT
|
|
d["apoC_Fichier_Exp"] = s
|
|
return d
|
|
|
|
|
|
def _apo_skip_section(f):
|
|
"Saute section Apo: s'arrete apres ligne vide"
|
|
while True:
|
|
line = f.readline().strip()
|
|
if not line:
|
|
break
|
|
|
|
|
|
# -------------------------------------
|
|
|
|
|
|
def comp_apo_sems(etape_apogee, annee_scolaire: int) -> list[dict]:
|
|
"""
|
|
:param etape_apogee: etape (string or ApoEtapeVDI)
|
|
:param annee_scolaire: annee (int)
|
|
:return: list of sems for etape_apogee in annee_scolaire
|
|
"""
|
|
return sco_formsemestre.list_formsemestre_by_etape(
|
|
etape_apo=str(etape_apogee), annee_scolaire=annee_scolaire
|
|
)
|
|
|
|
|
|
def nar_etuds_table(apo_data, NAR_Etuds):
|
|
"""Liste les NAR -> excel table"""
|
|
code_etape = apo_data.etape_apogee
|
|
today = datetime.datetime.today().strftime("%d/%m/%y")
|
|
L = []
|
|
NAR_Etuds.sort(key=lambda k: k["nom"])
|
|
for e in NAR_Etuds:
|
|
L.append(
|
|
{
|
|
"nom": e["nom"],
|
|
"prenom": e["prenom"],
|
|
"c0": "",
|
|
"c1": "AD",
|
|
"etape": code_etape,
|
|
"c3": "",
|
|
"c4": "",
|
|
"c5": "",
|
|
"c6": "N",
|
|
"c7": "",
|
|
"c8": "",
|
|
"NIP": e["nip"],
|
|
"c10": "",
|
|
"c11": "",
|
|
"c12": "",
|
|
"c13": "NAR - Jury",
|
|
"date": today,
|
|
}
|
|
)
|
|
|
|
columns_ids = (
|
|
"NIP",
|
|
"nom",
|
|
"prenom",
|
|
"etape",
|
|
"c0",
|
|
"c1",
|
|
"c3",
|
|
"c4",
|
|
"c5",
|
|
"c6",
|
|
"c7",
|
|
"c8",
|
|
"c10",
|
|
"c11",
|
|
"c12",
|
|
"c13",
|
|
"date",
|
|
)
|
|
T = GenTable(
|
|
columns_ids=columns_ids,
|
|
titles=dict(zip(columns_ids, columns_ids)),
|
|
rows=L,
|
|
xls_sheet_name="NAR ScoDoc",
|
|
)
|
|
return T.excel()
|
|
|
|
|
|
def export_csv_to_apogee(
|
|
apo_csv_data: str,
|
|
periode=None,
|
|
dest_zip=None,
|
|
export_res_etape=True,
|
|
export_res_sem=True,
|
|
export_res_ues=True,
|
|
export_res_modules=True,
|
|
export_res_sdj=True,
|
|
export_res_rat=True,
|
|
):
|
|
"""Genere un fichier CSV Apogée
|
|
à partir d'un fichier CSV Apogée vide (ou partiellement rempli)
|
|
et des résultats ScoDoc.
|
|
Si dest_zip, ajoute les fichiers générés à ce zip
|
|
sinon crée un zip et le publie
|
|
"""
|
|
apo_data = ApoData(
|
|
apo_csv_data,
|
|
periode=periode,
|
|
export_res_etape=export_res_etape,
|
|
export_res_sem=export_res_sem,
|
|
export_res_ues=export_res_ues,
|
|
export_res_modules=export_res_modules,
|
|
export_res_sdj=export_res_sdj,
|
|
export_res_rat=export_res_rat,
|
|
)
|
|
apo_data.setup() # -> .sems_etape
|
|
|
|
for e in apo_data.etuds:
|
|
e.is_apc = apo_data.is_apc
|
|
e.lookup_scodoc(apo_data.etape_formsemestre_ids)
|
|
e.associate_sco(apo_data)
|
|
|
|
# Ré-écrit le fichier Apogée
|
|
f = io.StringIO()
|
|
apo_data.write_header(f)
|
|
apo_data.write_etuds(f)
|
|
|
|
# Table des NAR:
|
|
NAR_Etuds = [e for e in apo_data.etuds if e.is_NAR]
|
|
if NAR_Etuds:
|
|
nar_xls = nar_etuds_table(apo_data, NAR_Etuds)
|
|
else:
|
|
nar_xls = None
|
|
|
|
# Journaux & Comptes-rendus
|
|
# Orphelins: etudiants dans fichier Apogée mais pas dans ScoDoc
|
|
Apo_Non_ScoDoc = [e for e in apo_data.etuds if e.etat == ETUD_ORPHELIN]
|
|
# Non inscrits: connus de ScoDoc mais pas inscrit dans l'étape cette année
|
|
Apo_Non_ScoDoc_Inscrits = [e for e in apo_data.etuds if e.etat == ETUD_NON_INSCRIT]
|
|
# CR table
|
|
cr_table = apo_data.build_cr_table()
|
|
cr_xls = cr_table.excel()
|
|
|
|
# Create ZIP
|
|
if not dest_zip:
|
|
data = io.BytesIO()
|
|
dest_zip = ZipFile(data, "w")
|
|
my_zip = True
|
|
else:
|
|
my_zip = False
|
|
# Ensure unique filenames
|
|
filename = apo_data.titles["apoC_Fichier_Exp"]
|
|
basename, ext = os.path.splitext(filename)
|
|
csv_filename = filename
|
|
|
|
if csv_filename in dest_zip.namelist():
|
|
basename = filename + "-" + apo_data.vdi_apogee
|
|
csv_filename = basename + ext
|
|
nf = 1
|
|
tmplname = basename
|
|
while csv_filename in dest_zip.namelist():
|
|
basename = tmplname + "-%d" % nf
|
|
csv_filename = basename + ext
|
|
nf += 1
|
|
|
|
log_filename = "scodoc-" + basename + ".log.txt"
|
|
nar_filename = basename + "-nar" + scu.XLSX_SUFFIX
|
|
cr_filename = basename + "-decisions" + scu.XLSX_SUFFIX
|
|
|
|
logf = io.StringIO()
|
|
logf.write("export_to_apogee du %s\n\n" % time.ctime())
|
|
logf.write("Semestres ScoDoc sources:\n")
|
|
for sem in apo_data.sems_etape:
|
|
logf.write("\t%(titremois)s\n" % sem)
|
|
logf.write("Periode: %s\n" % periode)
|
|
logf.write("export_res_etape: %s\n" % int(export_res_etape))
|
|
logf.write("export_res_sem: %s\n" % int(export_res_sem))
|
|
logf.write("export_res_ues: %s\n" % int(export_res_ues))
|
|
logf.write("export_res_modules: %s\n" % int(export_res_modules))
|
|
logf.write("export_res_sdj: %s\n" % int(export_res_sdj))
|
|
logf.write(
|
|
"\nEtudiants Apogee non trouves dans ScoDoc:\n"
|
|
+ "\n".join(
|
|
["%s\t%s\t%s" % (e["nip"], e["nom"], e["prenom"]) for e in Apo_Non_ScoDoc]
|
|
)
|
|
)
|
|
logf.write(
|
|
"\nEtudiants Apogee non inscrits sur ScoDoc dans cette étape:\n"
|
|
+ "\n".join(
|
|
[
|
|
"%s\t%s\t%s" % (e["nip"], e["nom"], e["prenom"])
|
|
for e in Apo_Non_ScoDoc_Inscrits
|
|
]
|
|
)
|
|
)
|
|
|
|
logf.write(
|
|
"\n\nElements Apogee inconnus dans ces semestres ScoDoc:\n"
|
|
+ "\n".join(apo_data.list_unknown_elements())
|
|
)
|
|
log(logf.getvalue()) # sortie aussi sur le log ScoDoc
|
|
|
|
csv_data = f.getvalue().encode(APO_OUTPUT_ENCODING)
|
|
|
|
# Write data to ZIP
|
|
dest_zip.writestr(csv_filename, csv_data)
|
|
dest_zip.writestr(log_filename, logf.getvalue())
|
|
if nar_xls:
|
|
dest_zip.writestr(nar_filename, nar_xls)
|
|
dest_zip.writestr(cr_filename, cr_xls)
|
|
|
|
if my_zip:
|
|
dest_zip.close()
|
|
data.seek(0)
|
|
return send_file(
|
|
data,
|
|
mimetype="application/zip",
|
|
download_name=scu.sanitize_filename(basename + "-scodoc.zip"),
|
|
as_attachment=True,
|
|
)
|
|
else:
|
|
return None # zip modified in place
|