############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """Lecture du fichier "maquette" Apogée Le fichier CSV, champs séparés par des tabulations, a la structure suivante: <pre> XX-APO_TITRES-XX apoC_annee 2007/2008 apoC_cod_dip VDTCJ apoC_Cod_Exp 1 apoC_cod_vdi 111 apoC_Fichier_Exp VDTCJ_V1CJ.txt apoC_lib_dip DUT CJ apoC_Titre1 Export Apogée du 13/06/2008 à 14:29 apoC_Titre2 XX-APO_TYP_RES-XX ...section optionnelle au contenu quelconque... XX-APO_COLONNES-XX apoL_a01_code Type Objet Code Version Année Session Admission/Admissibilité Type Rés. Etudiant Numéro apoL_a02_nom 1 Nom apoL_a03_prenom 1 Prénom apoL_a04_naissance Session Admissibilité Naissance APO_COL_VAL_DEB apoL_c0001 VET V1CJ 111 2007 0 1 N V1CJ - DUT CJ an1 0 1 Note apoL_c0002 VET V1CJ 111 2007 0 1 B 0 1 Barème apoL_c0003 VET V1CJ 111 2007 0 1 R 0 1 Résultat APO_COL_VAL_FIN apoL_c0030 APO_COL_VAL_FIN XX-APO_VALEURS-XX apoL_a01_code apoL_a02_nom apoL_a03_prenom apoL_a04_naissance apoL_c0001 apoL_c0002 apoL_c0003 apoL_c0004 apoL_c0005 apoL_c0006 apoL_c0007 apoL_c0008 apoL_c0009 apoL_c0010 apoL_c0011 apoL_c0012 apoL_c0013 apoL_c0014 apoL_c0015 apoL_c0016 apoL_c0017 apoL_c0018 apoL_c0019 apoL_c0020 apoL_c0021 apoL_c0022 apoL_c0023 apoL_c0024 apoL_c0025 apoL_c0026 apoL_c0027 apoL_c0028 apoL_c0029 10601232 AARIF MALIKA 22/09/1986 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM 18 20 18 20 ADM 18 20 ADM 18 20 ADM 18 20 ADM </pre> On récupère nos éléments pédagogiques dans la section XX-APO-COLONNES-XX et notre liste d'étudiants dans la section XX-APO_VALEURS-XX. Les champs de la section XX-APO_VALEURS-XX sont décrits par les lignes successives de la section XX-APO_COLONNES-XX. Le fichier CSV correspond à une étape, qui est récupérée sur la ligne <pre> apoL_c0001 VET V1CJ ... </pre> """ from collections import namedtuple import io import pprint import re # Pour la détection auto de l'encodage des fichiers Apogée: from chardet import detect as chardet_detect from app import log from app.scodoc.sco_exceptions import ScoFormatError from app.scodoc import sco_preferences APO_PORTAL_ENCODING = ( "utf8" # encodage du fichier CSV Apogée (était 'ISO-8859-1' avant jul. 2016) ) APO_INPUT_ENCODING = "ISO-8859-1" # APO_OUTPUT_ENCODING = APO_INPUT_ENCODING # encodage des fichiers Apogee générés APO_DECIMAL_SEP = "," # separateur décimal: virgule APO_SEP = "\t" APO_NEWLINE = "\r\n" ApoEtudTuple = namedtuple("ApoEtudTuple", ("nip", "nom", "prenom", "naissance", "cols")) class DictCol(dict): "A dict, where we can add attributes" class StringIOWithLineNumber(io.StringIO): "simple wrapper to use a string as a file with line numbers" def __init__(self, data: str): super().__init__(data) self.lineno = 0 def readline(self): self.lineno += 1 return super().readline() class ApoCSVReadWrite: "Gestion lecture/écriture de fichiers csv Apogée" def __init__(self, data: str): if not data: raise ScoFormatError("Fichier Apogée vide !") self.data = data self._file = StringIOWithLineNumber(data) # pour traiter comme un fichier self.apo_elts: dict = None self.cols: dict[str, dict[str, str]] = None self.column_titles: str = None self.col_ids: list[str] = None self.csv_etuds: list[ApoEtudTuple] = [] # section_str: utilisé pour ré-écrire les headers sans aucune altération self.sections_str: dict[str, str] = {} "contenu initial de chaque section" # self.header: str = "" # "début du fichier Apogée jusqu'à XX-APO_TYP_RES-XX non inclu (sera ré-écrit non modifié)" self.header_apo_typ_res: str = "" "section XX-APO_TYP_RES-XX (qui peut en option ne pas être ré-écrite)" self.titles: dict[str, str] = {} "titres Apogée (section XX-APO_TITRES-XX)" self.read_sections() # Check that we have collected all requested infos: if not self.header_apo_typ_res: # on pourrait rendre XX-APO_TYP_RES-XX optionnelle mais mieux vaut vérifier: raise ScoFormatError( "format incorrect: pas de XX-APO_TYP_RES-XX", filename=self.get_filename(), ) if self.cols is None: raise ScoFormatError( "format incorrect: pas de XX-APO_COLONNES-XX", filename=self.get_filename(), ) if self.column_titles is None: raise ScoFormatError( "format incorrect: pas de XX-APO_VALEURS-XX", filename=self.get_filename(), ) def read_sections(self): """Lit une à une les sections du fichier Apogée""" # sanity check: we are at the begining of Apogee CSV start_pos = self._file.tell() section = self._file.readline().strip() if section != "XX-APO_TITRES-XX": raise ScoFormatError("format incorrect: pas de XX-APO_TITRES-XX") while True: self.read_section(section) line, end_pos = _apo_next_non_blank_line(self._file) self.sections_str[section] = self.data[start_pos:end_pos] if not line: break section = line start_pos = end_pos def read_section(self, section_name: str): """Read a section: _file is on the first line after section title""" if section_name == "XX-APO_TITRES-XX": # Titres: # on va y chercher apoC_Fichier_Exp qui donnera le nom du fichier # ainsi que l'année scolaire et le code diplôme. self.titles = self._apo_read_titres(self._file) elif section_name == "XX-APO_TYP_RES-XX": self.header_apo_typ_res = _apo_read_typ_res(self._file) elif section_name == "XX-APO_COLONNES-XX": self.cols = self.apo_read_cols() self.apo_elts = self.group_elt_cols(self.cols) elif section_name == "XX-APO_VALEURS-XX": # les étudiants self.apo_read_section_valeurs() else: raise ScoFormatError( f"format incorrect: section inconnue: {section_name}", filename=self.get_filename(), ) def apo_read_cols(self): """Lecture colonnes apo : Démarre après la balise XX-APO_COLONNES-XX et s'arrête après la ligne suivant la balise APO_COL_VAL_FIN Colonne Apogee: les champs sont données par la ligne apoL_a01_code de la section XX-APO_COLONNES-XX col_id est apoL_c0001, apoL_c0002, ... :return: { col_id : { title : value } } Example: { 'apoL_c0001' : { 'Type Objet' : 'VET', 'Code' : 'V1IN', ... }, ... } """ line = self._file.readline().strip(" " + APO_NEWLINE) fields = line.split(APO_SEP) if fields[0] != "apoL_a01_code": raise ScoFormatError( f"invalid line: {line} (expecting apoL_a01_code)", filename=self.get_filename(), ) col_keys = fields while True: # skip premiere partie (apoL_a02_nom, ...) line = self._file.readline().strip(" " + APO_NEWLINE) if line == "APO_COL_VAL_DEB": break # après APO_COL_VAL_DEB cols = {} i = 0 while True: line = self._file.readline().strip(" " + APO_NEWLINE) if line == "APO_COL_VAL_FIN": break i += 1 fields = line.split(APO_SEP) # sanity check col_id = fields[0] # apoL_c0001, ... if col_id in cols: raise ScoFormatError( f"duplicate column definition: {col_id}", filename=self.get_filename(), ) m = re.match(r"^apoL_c([0-9]{4})$", col_id) if not m: raise ScoFormatError( f"invalid column id: {line} (expecting apoL_c{col_id})", filename=self.get_filename(), ) if int(m.group(1)) != i: raise ScoFormatError( f"invalid column id: {col_id} for index {i}", filename=self.get_filename(), ) cols[col_id] = DictCol(list(zip(col_keys, fields))) cols[col_id].lineno = self._file.lineno # for debuging purpose self._file.readline() # skip next line return cols def group_elt_cols(self, cols) -> dict: """Return (ordered) dict of ApoElt from list of ApoCols. Clé: id apogée, eg 'V1RT', 'V1GE2201', ... Valeur: ApoElt, avec les attributs code, type_objet Si les id Apogée ne sont pas uniques (ce n'est pas garanti), garde le premier """ elts = {} for col_id in sorted(list(cols.keys()), reverse=True): col = cols[col_id] if col["Code"] in elts: elts[col["Code"]].append(col) else: elts[col["Code"]] = ApoElt([col]) return elts # { code apo : ApoElt } def apo_read_section_valeurs(self): "traitement de la section XX-APO_VALEURS-XX" self.column_titles = self._file.readline() self.col_ids = self.column_titles.strip().split() self.csv_etuds = self.apo_read_etuds() def apo_read_etuds(self) -> list[ApoEtudTuple]: """Lecture des étudiants (et résultats) du fichier CSV Apogée. Les lignes "étudiant" commencent toujours par `12345678 NOM PRENOM 15/05/2003` le premier code étant le NIP. """ etud_tuples = [] while True: line = self._file.readline() # cette section est impérativement la dernière du fichier # donc on arrête ici: if not line: break if not line.strip(): continue # silently ignore blank lines line = line.strip(APO_NEWLINE) fields = line.split(APO_SEP) if len(fields) < 4: raise ScoFormatError( """Ligne étudiant invalide (doit commencer par 'NIP NOM PRENOM dd/mm/yyyy')""", filename=self.get_filename(), ) cols = {} # { col_id : value } try: for i, field in enumerate(fields): cols[self.col_ids[i]] = field except IndexError as exc: raise raise ScoFormatError( f"Fichier Apogee incorrect (colonnes excédentaires ? (<tt>{i}/{field}</tt>))", filename=self.get_filename(), ) from exc etud_tuples.append( ApoEtudTuple( nip=fields[0], # id etudiant nom=fields[1], prenom=fields[2], naissance=fields[3], cols=cols, ) # XXX à remettre dans apogee_csv.py # export_res_etape=self.export_res_etape, # export_res_sem=self.export_res_sem, # export_res_ues=self.export_res_ues, # export_res_modules=self.export_res_modules, # export_res_sdj=self.export_res_sdj, # export_res_rat=self.export_res_rat, # ) ) return etud_tuples def _apo_read_titres(self, f) -> dict: "Lecture section TITRES du fichier Apogée, renvoie dict" d = {} while True: line = f.readline().strip( " " + APO_NEWLINE ) # ne retire pas le \t (pour les clés vides) if not line.strip(): # stoppe sur ligne pleines de \t break fields = line.split(APO_SEP) if len(fields) == 2: k, v = fields else: log(f"Error read CSV: \nline={line}\nfields={fields}") log(dir(f)) raise ScoFormatError( f"Fichier Apogee incorrect (section titres, {len(fields)} champs au lieu de 2)", filename=self.get_filename(), ) d[k] = v # if not d.get("apoC_Fichier_Exp", None): raise ScoFormatError( "Fichier Apogee incorrect: pas de titre apoC_Fichier_Exp", filename=self.get_filename(), ) # keep only basename: may be a windows or unix pathname s = d["apoC_Fichier_Exp"].split("/")[-1] s = s.split("\\")[-1] # for DOS paths, eg C:\TEMP\VL4RT_V3ASR.TXT d["apoC_Fichier_Exp"] = s return d def get_filename(self) -> str: """Le nom du fichier APogée, tel qu'indiqué dans le fichier ou vide.""" if self.titles: return self.titles.get("apoC_Fichier_Exp", "") return "" def write(self, apo_etuds: list["ApoEtud"]) -> bytes: """Renvoie le contenu actualisé du fichier Apogée""" f = io.StringIO() self._write_header(f) self._write_etuds(f, apo_etuds) return f.getvalue().encode(APO_OUTPUT_ENCODING) def _write_etuds(self, f, apo_etuds: list["ApoEtud"]): """write apo CSV etuds on f""" for apo_etud in apo_etuds: fields = [] # e['nip'], e['nom'], e['prenom'], e['naissance'] ] for col_id in self.col_ids: try: fields.append(str(apo_etud.new_cols[col_id])) except KeyError: log( f"""Error: {apo_etud["nip"]} {apo_etud["nom"]} missing column key {col_id} Details:\napo_etud = {pprint.pformat(apo_etud)} col_ids={pprint.pformat(self.col_ids)} étudiant ignoré. """ ) f.write(APO_SEP.join(fields) + APO_NEWLINE) def _write_header(self, f): """write apo CSV header on f (beginning of CSV until columns titles just after XX-APO_VALEURS-XX line) """ remove_typ_res = sco_preferences.get_preference("export_res_remove_typ_res") for section, data in self.sections_str.items(): # ne recopie pas la section résultats, et en option supprime APO_TYP_RES if (section != "XX-APO_VALEURS-XX") and ( section != "XX-APO_TYP_RES-XX" or not remove_typ_res ): f.write(data) f.write("XX-APO_VALEURS-XX" + APO_NEWLINE) f.write(self.column_titles) class ApoElt: """Définition d'un Element Apogée sur plusieurs colonnes du fichier CSV """ def __init__(self, cols): assert len(cols) > 0 assert len(set([c["Code"] for c in cols])) == 1 # colonnes de meme code assert len(set([c["Type Objet"] for c in cols])) == 1 # colonnes de meme type self.cols = cols self.code = cols[0]["Code"] self.version = cols[0]["Version"] self.type_objet = cols[0]["Type Objet"] def append(self, col): """ajoute une "colonne" à l'élément""" assert col["Code"] == self.code if col["Type Objet"] != self.type_objet: log( f"""Warning: ApoElt: duplicate id { self.code} ({self.type_objet} and {col["Type Objet"]})""" ) self.type_objet = col["Type Objet"] self.cols.append(col) def __repr__(self): return f"ApoElt(code='{self.code}', cols={pprint.pformat(self.cols)})" def guess_data_encoding(text: bytes, threshold=0.6): """Guess string encoding, using chardet heuristics. Returns encoding, or None if detection failed (confidence below threshold) """ r = chardet_detect(text) if r["confidence"] < threshold: return None else: return r["encoding"] def fix_data_encoding( text: bytes, default_source_encoding=APO_INPUT_ENCODING, dest_encoding=APO_INPUT_ENCODING, ) -> tuple[bytes, str]: """Try to ensure that text is using dest_encoding returns converted text, and a message describing the conversion. Raises UnicodeEncodeError en cas de problème, en général liée à une auto-détection errornée. """ message = "" detected_encoding = guess_data_encoding(text) if not detected_encoding: if default_source_encoding != dest_encoding: message = f"converting from {default_source_encoding} to {dest_encoding}" text = text.decode(default_source_encoding).encode(dest_encoding) else: if detected_encoding != dest_encoding: message = ( f"converting from detected {default_source_encoding} to {dest_encoding}" ) text = text.decode(detected_encoding).encode(dest_encoding) return text, message def _apo_read_typ_res(f) -> str: "Lit la section XX-APO_TYP_RES-XX" text = "XX-APO_TYP_RES-XX" + APO_NEWLINE while True: line = f.readline() stripped_line = line.strip() if not stripped_line: break text += line return text def _apo_next_non_blank_line(f: StringIOWithLineNumber) -> tuple[str, int]: "Ramène prochaine ligne non blanche, stripped, et l'indice de son début" while True: pos = f.tell() line = f.readline() if not line: return "", -1 stripped_line = line.strip() if stripped_line: return stripped_line, pos