484 lines
18 KiB
Python
484 lines
18 KiB
Python
##############################################################################
|
|
#
|
|
# Gestion scolarite IUT
|
|
#
|
|
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Emmanuel Viennet emmanuel.viennet@viennet.net
|
|
#
|
|
##############################################################################
|
|
|
|
"""Lecture du fichier "maquette" Apogée
|
|
|
|
Le fichier CSV, champs séparés par des tabulations, a la structure suivante:
|
|
|
|
<pre>
|
|
XX-APO_TITRES-XX
|
|
apoC_annee 2007/2008
|
|
apoC_cod_dip VDTCJ
|
|
apoC_Cod_Exp 1
|
|
apoC_cod_vdi 111
|
|
apoC_Fichier_Exp VDTCJ_V1CJ.txt
|
|
apoC_lib_dip DUT CJ
|
|
apoC_Titre1 Export Apogée du 13/06/2008 à 14:29
|
|
apoC_Titre2
|
|
|
|
XX-APO_TYP_RES-XX
|
|
...section optionnelle au contenu quelconque...
|
|
|
|
XX-APO_COLONNES-XX
|
|
apoL_a01_code Type Objet Code Version Année Session Admission/Admissibilité Type Rés. Etudiant Numéro
|
|
apoL_a02_nom 1 Nom
|
|
apoL_a03_prenom 1 Prénom
|
|
apoL_a04_naissance Session Admissibilité Naissance
|
|
APO_COL_VAL_DEB
|
|
apoL_c0001 VET V1CJ 111 2007 0 1 N V1CJ - DUT CJ an1 0 1 Note
|
|
apoL_c0002 VET V1CJ 111 2007 0 1 B 0 1 Barème
|
|
apoL_c0003 VET V1CJ 111 2007 0 1 R 0 1 Résultat
|
|
APO_COL_VAL_FIN
|
|
apoL_c0030 APO_COL_VAL_FIN
|
|
|
|
XX-APO_VALEURS-XX
|
|
apoL_a01_code apoL_a02_nom apoL_a03_prenom apoL_a04_naissance apoL_c0001 apoL_c0002 apoL_c0003 apoL_c0004 apoL_c0005 apoL_c0006 apoL_c0007 apoL_c0008 apoL_c0009 apoL_c0010 apoL_c0011 apoL_c0012 apoL_c0013 apoL_c0014 apoL_c0015 apoL_c0016 apoL_c0017 apoL_c0018 apoL_c0019 apoL_c0020 apoL_c0021 apoL_c0022 apoL_c0023 apoL_c0024 apoL_c0025 apoL_c0026 apoL_c0027 apoL_c0028 apoL_c0029
|
|
10601232 AARIF MALIKA 22/09/1986 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM 18 20 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM
|
|
</pre>
|
|
|
|
|
|
On récupère nos éléments pédagogiques dans la section XX-APO-COLONNES-XX et
|
|
notre liste d'étudiants dans la section XX-APO_VALEURS-XX. Les champs de la
|
|
section XX-APO_VALEURS-XX sont décrits par les lignes successives de la
|
|
section XX-APO_COLONNES-XX.
|
|
|
|
Le fichier CSV correspond à une étape, qui est récupérée sur la ligne
|
|
<pre>
|
|
apoL_c0001 VET V1CJ ...
|
|
</pre>
|
|
"""
|
|
from collections import namedtuple
|
|
import io
|
|
import pprint
|
|
import re
|
|
|
|
# Pour la détection auto de l'encodage des fichiers Apogée:
|
|
from chardet import detect as chardet_detect
|
|
|
|
from app import log
|
|
from app.scodoc.sco_exceptions import ScoFormatError
|
|
|
|
APO_PORTAL_ENCODING = (
|
|
"utf8" # encodage du fichier CSV Apogée (était 'ISO-8859-1' avant jul. 2016)
|
|
)
|
|
APO_INPUT_ENCODING = "ISO-8859-1" #
|
|
APO_OUTPUT_ENCODING = APO_INPUT_ENCODING # encodage des fichiers Apogee générés
|
|
APO_DECIMAL_SEP = "," # separateur décimal: virgule
|
|
APO_SEP = "\t"
|
|
APO_NEWLINE = "\r\n"
|
|
|
|
ApoEtudTuple = namedtuple("ApoEtudTuple", ("nip", "nom", "prenom", "naissance", "cols"))
|
|
|
|
|
|
class DictCol(dict):
|
|
"A dict, where we can add attributes"
|
|
|
|
|
|
class StringIOWithLineNumber(io.StringIO):
|
|
"simple wrapper to use a string as a file with line numbers"
|
|
|
|
def __init__(self, data: str):
|
|
super().__init__(data)
|
|
self.lineno = 0
|
|
|
|
def readline(self):
|
|
self.lineno += 1
|
|
return super().readline()
|
|
|
|
|
|
class ApoCSVReadWrite:
|
|
"Gestion lecture/écriture de fichiers csv Apogée"
|
|
|
|
def __init__(self, data: str):
|
|
if not data:
|
|
raise ScoFormatError("Fichier Apogée vide !")
|
|
self.data = data
|
|
self._file = StringIOWithLineNumber(data) # pour traiter comme un fichier
|
|
self.apo_elts: dict = None
|
|
self.cols: dict[str, dict[str, str]] = None
|
|
self.column_titles: str = None
|
|
self.col_ids: list[str] = None
|
|
self.csv_etuds: list[ApoEtudTuple] = []
|
|
# section_str: utilisé pour ré-écrire les headers sans aucune altération
|
|
self.sections_str: dict[str, str] = {}
|
|
"contenu initial de chaque section"
|
|
# self.header: str = ""
|
|
# "début du fichier Apogée jusqu'à XX-APO_TYP_RES-XX non inclu (sera ré-écrit non modifié)"
|
|
self.header_apo_typ_res: str = ""
|
|
"section XX-APO_TYP_RES-XX (qui peut en option ne pas être ré-écrite)"
|
|
self.titles: dict[str, str] = {}
|
|
"titres Apogée (section XX-APO_TITRES-XX)"
|
|
|
|
self.read_sections()
|
|
|
|
# Check that we have collected all requested infos:
|
|
if not self.header_apo_typ_res:
|
|
# on pourrait rendre XX-APO_TYP_RES-XX optionnelle mais mieux vaut vérifier:
|
|
raise ScoFormatError(
|
|
"format incorrect: pas de XX-APO_TYP_RES-XX",
|
|
filename=self.get_filename(),
|
|
)
|
|
if self.cols is None:
|
|
raise ScoFormatError(
|
|
"format incorrect: pas de XX-APO_COLONNES-XX",
|
|
filename=self.get_filename(),
|
|
)
|
|
if self.column_titles is None:
|
|
raise ScoFormatError(
|
|
"format incorrect: pas de XX-APO_VALEURS-XX",
|
|
filename=self.get_filename(),
|
|
)
|
|
|
|
def read_sections(self):
|
|
"""Lit une à une les sections du fichier Apogée"""
|
|
# sanity check: we are at the begining of Apogee CSV
|
|
start_pos = self._file.tell()
|
|
section = self._file.readline().strip()
|
|
if section != "XX-APO_TITRES-XX":
|
|
raise ScoFormatError("format incorrect: pas de XX-APO_TITRES-XX")
|
|
|
|
while True:
|
|
self.read_section(section)
|
|
line, end_pos = _apo_next_non_blank_line(self._file)
|
|
self.sections_str[section] = self.data[start_pos:end_pos]
|
|
if not line:
|
|
break
|
|
section = line
|
|
start_pos = end_pos
|
|
|
|
def read_section(self, section_name: str):
|
|
"""Read a section: _file is on the first line after section title"""
|
|
if section_name == "XX-APO_TITRES-XX":
|
|
# Titres:
|
|
# on va y chercher apoC_Fichier_Exp qui donnera le nom du fichier
|
|
# ainsi que l'année scolaire et le code diplôme.
|
|
self.titles = self._apo_read_titres(self._file)
|
|
elif section_name == "XX-APO_TYP_RES-XX":
|
|
self.header_apo_typ_res = _apo_read_typ_res(self._file)
|
|
elif section_name == "XX-APO_COLONNES-XX":
|
|
self.cols = self.apo_read_cols()
|
|
self.apo_elts = self.group_elt_cols(self.cols)
|
|
elif section_name == "XX-APO_VALEURS-XX":
|
|
# les étudiants
|
|
self.apo_read_section_valeurs()
|
|
else:
|
|
raise ScoFormatError(
|
|
f"format incorrect: section inconnue: {section_name}",
|
|
filename=self.get_filename(),
|
|
)
|
|
|
|
def apo_read_cols(self):
|
|
"""Lecture colonnes apo :
|
|
Démarre après la balise XX-APO_COLONNES-XX
|
|
et s'arrête après la ligne suivant la balise APO_COL_VAL_FIN
|
|
|
|
Colonne Apogee: les champs sont données par la ligne
|
|
apoL_a01_code de la section XX-APO_COLONNES-XX
|
|
col_id est apoL_c0001, apoL_c0002, ...
|
|
|
|
:return: { col_id : { title : value } }
|
|
Example: { 'apoL_c0001' : { 'Type Objet' : 'VET', 'Code' : 'V1IN', ... }, ... }
|
|
"""
|
|
line = self._file.readline().strip(" " + APO_NEWLINE)
|
|
fields = line.split(APO_SEP)
|
|
if fields[0] != "apoL_a01_code":
|
|
raise ScoFormatError(
|
|
f"invalid line: {line} (expecting apoL_a01_code)",
|
|
filename=self.get_filename(),
|
|
)
|
|
col_keys = fields
|
|
|
|
while True: # skip premiere partie (apoL_a02_nom, ...)
|
|
line = self._file.readline().strip(" " + APO_NEWLINE)
|
|
if line == "APO_COL_VAL_DEB":
|
|
break
|
|
# après APO_COL_VAL_DEB
|
|
cols = {}
|
|
i = 0
|
|
while True:
|
|
line = self._file.readline().strip(" " + APO_NEWLINE)
|
|
if line == "APO_COL_VAL_FIN":
|
|
break
|
|
i += 1
|
|
fields = line.split(APO_SEP)
|
|
# sanity check
|
|
col_id = fields[0] # apoL_c0001, ...
|
|
if col_id in cols:
|
|
raise ScoFormatError(
|
|
f"duplicate column definition: {col_id}",
|
|
filename=self.get_filename(),
|
|
)
|
|
m = re.match(r"^apoL_c([0-9]{4})$", col_id)
|
|
if not m:
|
|
raise ScoFormatError(
|
|
f"invalid column id: {line} (expecting apoL_c{col_id})",
|
|
filename=self.get_filename(),
|
|
)
|
|
if int(m.group(1)) != i:
|
|
raise ScoFormatError(
|
|
f"invalid column id: {col_id} for index {i}",
|
|
filename=self.get_filename(),
|
|
)
|
|
|
|
cols[col_id] = DictCol(list(zip(col_keys, fields)))
|
|
cols[col_id].lineno = self._file.lineno # for debuging purpose
|
|
|
|
self._file.readline() # skip next line
|
|
|
|
return cols
|
|
|
|
def group_elt_cols(self, cols) -> dict:
|
|
"""Return (ordered) dict of ApoElt from list of ApoCols.
|
|
Clé: id apogée, eg 'V1RT', 'V1GE2201', ...
|
|
Valeur: ApoElt, avec les attributs code, type_objet
|
|
|
|
Si les id Apogée ne sont pas uniques (ce n'est pas garanti), garde le premier
|
|
"""
|
|
elts = {}
|
|
for col_id in sorted(list(cols.keys()), reverse=True):
|
|
col = cols[col_id]
|
|
if col["Code"] in elts:
|
|
elts[col["Code"]].append(col)
|
|
else:
|
|
elts[col["Code"]] = ApoElt([col])
|
|
return elts # { code apo : ApoElt }
|
|
|
|
def apo_read_section_valeurs(self):
|
|
"traitement de la section XX-APO_VALEURS-XX"
|
|
self.column_titles = self._file.readline()
|
|
self.col_ids = self.column_titles.strip().split()
|
|
self.csv_etuds = self.apo_read_etuds()
|
|
|
|
def apo_read_etuds(self) -> list[ApoEtudTuple]:
|
|
"""Lecture des étudiants (et résultats) du fichier CSV Apogée.
|
|
Les lignes "étudiant" commencent toujours par
|
|
`12345678 NOM PRENOM 15/05/2003`
|
|
le premier code étant le NIP.
|
|
"""
|
|
etud_tuples = []
|
|
while True:
|
|
line = self._file.readline()
|
|
# cette section est impérativement la dernière du fichier
|
|
# donc on arrête ici:
|
|
if not line:
|
|
break
|
|
if not line.strip():
|
|
continue # silently ignore blank lines
|
|
line = line.strip(APO_NEWLINE)
|
|
fields = line.split(APO_SEP)
|
|
if len(fields) < 4:
|
|
raise ScoFormatError(
|
|
"""Ligne étudiant invalide
|
|
(doit commencer par 'NIP NOM PRENOM dd/mm/yyyy')""",
|
|
filename=self.get_filename(),
|
|
)
|
|
cols = {} # { col_id : value }
|
|
for i, field in enumerate(fields):
|
|
cols[self.col_ids[i]] = field
|
|
etud_tuples.append(
|
|
ApoEtudTuple(
|
|
nip=fields[0], # id etudiant
|
|
nom=fields[1],
|
|
prenom=fields[2],
|
|
naissance=fields[3],
|
|
cols=cols,
|
|
)
|
|
# XXX à remettre dans apogee_csv.py
|
|
# export_res_etape=self.export_res_etape,
|
|
# export_res_sem=self.export_res_sem,
|
|
# export_res_ues=self.export_res_ues,
|
|
# export_res_modules=self.export_res_modules,
|
|
# export_res_sdj=self.export_res_sdj,
|
|
# export_res_rat=self.export_res_rat,
|
|
# )
|
|
)
|
|
|
|
return etud_tuples
|
|
|
|
def _apo_read_titres(self, f) -> dict:
|
|
"Lecture section TITRES du fichier Apogée, renvoie dict"
|
|
d = {}
|
|
while True:
|
|
line = f.readline().strip(
|
|
" " + APO_NEWLINE
|
|
) # ne retire pas le \t (pour les clés vides)
|
|
if not line.strip(): # stoppe sur ligne pleines de \t
|
|
break
|
|
|
|
fields = line.split(APO_SEP)
|
|
if len(fields) == 2:
|
|
k, v = fields
|
|
else:
|
|
log(f"Error read CSV: \nline={line}\nfields={fields}")
|
|
log(dir(f))
|
|
raise ScoFormatError(
|
|
f"Fichier Apogee incorrect (section titres, {len(fields)} champs au lieu de 2)",
|
|
filename=self.get_filename(),
|
|
)
|
|
d[k] = v
|
|
#
|
|
if not d.get("apoC_Fichier_Exp", None):
|
|
raise ScoFormatError(
|
|
"Fichier Apogee incorrect: pas de titre apoC_Fichier_Exp",
|
|
filename=self.get_filename(),
|
|
)
|
|
# keep only basename: may be a windows or unix pathname
|
|
s = d["apoC_Fichier_Exp"].split("/")[-1]
|
|
s = s.split("\\")[-1] # for DOS paths, eg C:\TEMP\VL4RT_V3ASR.TXT
|
|
d["apoC_Fichier_Exp"] = s
|
|
return d
|
|
|
|
def get_filename(self) -> str:
|
|
"""Le nom du fichier APogée, tel qu'indiqué dans le fichier
|
|
ou vide."""
|
|
if self.titles:
|
|
return self.titles.get("apoC_Fichier_Exp", "")
|
|
return ""
|
|
|
|
def write(self, apo_etuds: list["ApoEtud"]) -> bytes:
|
|
"""Renvoie le contenu actualisé du fichier Apogée"""
|
|
f = io.StringIO()
|
|
self._write_header(f)
|
|
self._write_etuds(f, apo_etuds)
|
|
return f.getvalue().encode(APO_OUTPUT_ENCODING)
|
|
|
|
def _write_etuds(self, f, apo_etuds: list["ApoEtud"]):
|
|
"""write apo CSV etuds on f"""
|
|
for apo_etud in apo_etuds:
|
|
fields = [] # e['nip'], e['nom'], e['prenom'], e['naissance'] ]
|
|
for col_id in self.col_ids:
|
|
try:
|
|
fields.append(str(apo_etud.new_cols[col_id]))
|
|
except KeyError:
|
|
log(
|
|
f"""Error: {apo_etud["nip"]} {apo_etud["nom"]} missing column key {col_id}
|
|
Details:\napo_etud = {pprint.pformat(apo_etud)}
|
|
col_ids={pprint.pformat(self.col_ids)}
|
|
étudiant ignoré.
|
|
"""
|
|
)
|
|
f.write(APO_SEP.join(fields) + APO_NEWLINE)
|
|
|
|
def _write_header(self, f):
|
|
"""write apo CSV header on f
|
|
(beginning of CSV until columns titles just after XX-APO_VALEURS-XX line)
|
|
"""
|
|
for section, data in self.sections_str.items():
|
|
if section != "XX-APO_VALEURS-XX":
|
|
# XXX TODO ici on va filtrer XX-APO_TYP_RES-XX
|
|
f.write(data)
|
|
|
|
f.write("XX-APO_VALEURS-XX" + APO_NEWLINE)
|
|
f.write(self.column_titles)
|
|
|
|
|
|
class ApoElt:
|
|
"""Définition d'un Element Apogée
|
|
sur plusieurs colonnes du fichier CSV
|
|
"""
|
|
|
|
def __init__(self, cols):
|
|
assert len(cols) > 0
|
|
assert len(set([c["Code"] for c in cols])) == 1 # colonnes de meme code
|
|
assert len(set([c["Type Objet"] for c in cols])) == 1 # colonnes de meme type
|
|
self.cols = cols
|
|
self.code = cols[0]["Code"]
|
|
self.version = cols[0]["Version"]
|
|
self.type_objet = cols[0]["Type Objet"]
|
|
|
|
def append(self, col):
|
|
"""ajoute une "colonne" à l'élément"""
|
|
assert col["Code"] == self.code
|
|
if col["Type Objet"] != self.type_objet:
|
|
log(
|
|
f"""Warning: ApoElt: duplicate id {
|
|
self.code} ({self.type_objet} and {col["Type Objet"]})"""
|
|
)
|
|
self.type_objet = col["Type Objet"]
|
|
self.cols.append(col)
|
|
|
|
def __repr__(self):
|
|
return f"ApoElt(code='{self.code}', cols={pprint.pformat(self.cols)})"
|
|
|
|
|
|
def guess_data_encoding(text: bytes, threshold=0.6):
|
|
"""Guess string encoding, using chardet heuristics.
|
|
Returns encoding, or None if detection failed (confidence below threshold)
|
|
"""
|
|
r = chardet_detect(text)
|
|
if r["confidence"] < threshold:
|
|
return None
|
|
else:
|
|
return r["encoding"]
|
|
|
|
|
|
def fix_data_encoding(
|
|
text: bytes,
|
|
default_source_encoding=APO_INPUT_ENCODING,
|
|
dest_encoding=APO_INPUT_ENCODING,
|
|
) -> tuple[bytes, str]:
|
|
"""Try to ensure that text is using dest_encoding
|
|
returns converted text, and a message describing the conversion.
|
|
|
|
Raises UnicodeEncodeError en cas de problème, en général liée à
|
|
une auto-détection errornée.
|
|
"""
|
|
message = ""
|
|
detected_encoding = guess_data_encoding(text)
|
|
if not detected_encoding:
|
|
if default_source_encoding != dest_encoding:
|
|
message = f"converting from {default_source_encoding} to {dest_encoding}"
|
|
text = text.decode(default_source_encoding).encode(dest_encoding)
|
|
else:
|
|
if detected_encoding != dest_encoding:
|
|
message = (
|
|
f"converting from detected {default_source_encoding} to {dest_encoding}"
|
|
)
|
|
text = text.decode(detected_encoding).encode(dest_encoding)
|
|
return text, message
|
|
|
|
|
|
def _apo_read_typ_res(f) -> str:
|
|
"Lit la section XX-APO_TYP_RES-XX"
|
|
text = "XX-APO_TYP_RES-XX" + APO_NEWLINE
|
|
while True:
|
|
line = f.readline()
|
|
stripped_line = line.strip()
|
|
if not stripped_line:
|
|
break
|
|
text += line
|
|
return text
|
|
|
|
|
|
def _apo_next_non_blank_line(f: StringIOWithLineNumber) -> tuple[str, int]:
|
|
"Ramène prochaine ligne non blanche, stripped, et l'indice de son début"
|
|
while True:
|
|
pos = f.tell()
|
|
line = f.readline()
|
|
if not line:
|
|
return "", -1
|
|
stripped_line = line.strip()
|
|
if stripped_line:
|
|
return stripped_line, pos
|