# Gestion scolarite IUT
# Copyright (c) 1999 - 2023 Emmanuel Viennet.  All rights reserved.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
#   Emmanuel Viennet      emmanuel.viennet@viennet.net

"""Lecture du fichier "maquette" Apogée

Le fichier CSV, champs séparés par des tabulations, a la structure suivante:

 apoC_annee	2007/2008
 apoC_cod_dip	VDTCJ
 apoC_Cod_Exp	1
 apoC_cod_vdi	111
 apoC_Fichier_Exp	VDTCJ_V1CJ.txt
 apoC_lib_dip	DUT CJ
 apoC_Titre1	Export Apogée du 13/06/2008 à 14:29

 ...section optionnelle au contenu quelconque...

 apoL_a01_code	Type Objet	Code	Version	Année	Session	Admission/Admissibilité	Type Rés.			Etudiant	Numéro
 apoL_a02_nom										1	Nom
 apoL_a03_prenom										1	Prénom
 apoL_a04_naissance									Session	Admissibilité	Naissance
 apoL_c0001	VET	V1CJ	111	2007	0	1	N	V1CJ - DUT CJ an1	0	1	Note
 apoL_c0002	VET	V1CJ	111	2007	0	1	B		0	1	Barème
 apoL_c0003	VET	V1CJ	111	2007	0	1	R		0	1	Résultat
 apoL_c0030	APO_COL_VAL_FIN

 apoL_a01_code	apoL_a02_nom	apoL_a03_prenom	apoL_a04_naissance	apoL_c0001	apoL_c0002	apoL_c0003	apoL_c0004	apoL_c0005	apoL_c0006	apoL_c0007	apoL_c0008	apoL_c0009	apoL_c0010	apoL_c0011	apoL_c0012	apoL_c0013	apoL_c0014	apoL_c0015	apoL_c0016	apoL_c0017	apoL_c0018	apoL_c0019	apoL_c0020	apoL_c0021	apoL_c0022	apoL_c0023	apoL_c0024	apoL_c0025	apoL_c0026	apoL_c0027	apoL_c0028	apoL_c0029
 10601232	AARIF	MALIKA	 22/09/1986	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM	18	20	18	20	ADM	18	20	ADM	18	20	ADM	18	20	ADM

 On récupère nos éléments pédagogiques dans la section XX-APO-COLONNES-XX et
 notre liste d'étudiants dans la section XX-APO_VALEURS-XX. Les champs de la
 section XX-APO_VALEURS-XX sont décrits par les lignes successives de la

 Le fichier CSV correspond à une étape, qui est récupérée sur la ligne
 apoL_c0001	VET	V1CJ ...
from collections import namedtuple
import io
import pprint
import re

# Pour la détection auto de l'encodage des fichiers Apogée:
from chardet import detect as chardet_detect

from app import log
from app.scodoc.sco_exceptions import ScoFormatError
from app.scodoc import sco_preferences

    "utf8"  # encodage du fichier CSV Apogée (était 'ISO-8859-1' avant jul. 2016)
APO_OUTPUT_ENCODING = APO_INPUT_ENCODING  # encodage des fichiers Apogee générés
APO_DECIMAL_SEP = ","  # separateur décimal: virgule
APO_SEP = "\t"
APO_NEWLINE = "\r\n"

ApoEtudTuple = namedtuple("ApoEtudTuple", ("nip", "nom", "prenom", "naissance", "cols"))

class DictCol(dict):
    "A dict, where we can add attributes"

class StringIOWithLineNumber(io.StringIO):
    "simple wrapper to use a string as a file with line numbers"

    def __init__(self, data: str):
        self.lineno = 0

    def readline(self):
        self.lineno += 1
        return super().readline()

class ApoCSVReadWrite:
    "Gestion lecture/écriture de fichiers csv Apogée"

    def __init__(self, data: str):
        if not data:
            raise ScoFormatError("Fichier Apogée vide !")
        self.data = data
        self._file = StringIOWithLineNumber(data)  # pour traiter comme un fichier
        self.apo_elts: dict = None
        self.cols: dict[str, dict[str, str]] = None
        self.column_titles: str = None
        self.col_ids: list[str] = None
        self.csv_etuds: list[ApoEtudTuple] = []
        # section_str: utilisé pour ré-écrire les headers sans aucune altération
        self.sections_str: dict[str, str] = {}
        "contenu initial de chaque section"
        # self.header: str = ""
        # "début du fichier Apogée jusqu'à XX-APO_TYP_RES-XX non inclu (sera ré-écrit non modifié)"
        self.header_apo_typ_res: str = ""
        "section XX-APO_TYP_RES-XX (qui peut en option ne pas être ré-écrite)"
        self.titles: dict[str, str] = {}
        "titres Apogée (section XX-APO_TITRES-XX)"


        # Check that we have collected all requested infos:
        if not self.header_apo_typ_res:
            # on pourrait rendre XX-APO_TYP_RES-XX optionnelle mais mieux vaut vérifier:
            raise ScoFormatError(
                "format incorrect: pas de XX-APO_TYP_RES-XX",
        if self.cols is None:
            raise ScoFormatError(
                "format incorrect: pas de XX-APO_COLONNES-XX",
        if self.column_titles is None:
            raise ScoFormatError(
                "format incorrect: pas de XX-APO_VALEURS-XX",

    def read_sections(self):
        """Lit une à une les sections du fichier Apogée"""
        # sanity check: we are at the begining of Apogee CSV
        start_pos = self._file.tell()
        section = self._file.readline().strip()
        if section != "XX-APO_TITRES-XX":
            raise ScoFormatError("format incorrect: pas de XX-APO_TITRES-XX")

        while True:
            line, end_pos = _apo_next_non_blank_line(self._file)
            self.sections_str[section] = self.data[start_pos:end_pos]
            if not line:
            section = line
            start_pos = end_pos

    def read_section(self, section_name: str):
        """Read a section: _file is on the first line after section title"""
        if section_name == "XX-APO_TITRES-XX":
            # Titres:
            #   on va y chercher apoC_Fichier_Exp qui donnera le nom du fichier
            #   ainsi que l'année scolaire et le code diplôme.
            self.titles = self._apo_read_titres(self._file)
        elif section_name == "XX-APO_TYP_RES-XX":
            self.header_apo_typ_res = _apo_read_typ_res(self._file)
        elif section_name == "XX-APO_COLONNES-XX":
            self.cols = self.apo_read_cols()
            self.apo_elts = self.group_elt_cols(self.cols)
        elif section_name == "XX-APO_VALEURS-XX":
            # les étudiants
            raise ScoFormatError(
                f"format incorrect: section inconnue: {section_name}",

    def apo_read_cols(self):
        """Lecture colonnes apo :
        Démarre après la balise XX-APO_COLONNES-XX
        et s'arrête après la ligne suivant la balise APO_COL_VAL_FIN

        Colonne Apogee: les champs sont données par la ligne
        apoL_a01_code de la section XX-APO_COLONNES-XX
        col_id est apoL_c0001, apoL_c0002, ...

        :return: { col_id : { title : value } }
        Example: { 'apoL_c0001' : { 'Type Objet' : 'VET', 'Code' : 'V1IN', ... }, ... }
        line = self._file.readline().strip(" " + APO_NEWLINE)
        fields = line.split(APO_SEP)
        if fields[0] != "apoL_a01_code":
            raise ScoFormatError(
                f"invalid line: {line} (expecting apoL_a01_code)",
        col_keys = fields

        while True:  # skip premiere partie (apoL_a02_nom, ...)
            line = self._file.readline().strip(" " + APO_NEWLINE)
            if line == "APO_COL_VAL_DEB":
        # après APO_COL_VAL_DEB
        cols = {}
        i = 0
        while True:
            line = self._file.readline().strip(" " + APO_NEWLINE)
            if line == "APO_COL_VAL_FIN":
            i += 1
            fields = line.split(APO_SEP)
            # sanity check
            col_id = fields[0]  # apoL_c0001, ...
            if col_id in cols:
                raise ScoFormatError(
                    f"duplicate column definition: {col_id}",
            m = re.match(r"^apoL_c([0-9]{4})$", col_id)
            if not m:
                raise ScoFormatError(
                    f"invalid column id: {line} (expecting apoL_c{col_id})",
            if int(m.group(1)) != i:
                raise ScoFormatError(
                    f"invalid column id: {col_id} for index {i}",

            cols[col_id] = DictCol(list(zip(col_keys, fields)))
            cols[col_id].lineno = self._file.lineno  # for debuging purpose

        self._file.readline()  # skip next line

        return cols

    def group_elt_cols(self, cols) -> dict:
        """Return (ordered) dict of ApoElt from list of ApoCols.
        Clé: id apogée, eg 'V1RT', 'V1GE2201', ...
        Valeur: ApoElt, avec les attributs code, type_objet

        Si les id Apogée ne sont pas uniques (ce n'est pas garanti), garde le premier
        elts = {}
        for col_id in sorted(list(cols.keys()), reverse=True):
            col = cols[col_id]
            if col["Code"] in elts:
                elts[col["Code"]] = ApoElt([col])
        return elts  # { code apo : ApoElt }

    def apo_read_section_valeurs(self):
        "traitement de la section XX-APO_VALEURS-XX"
        self.column_titles = self._file.readline()
        self.col_ids = self.column_titles.strip().split()
        self.csv_etuds = self.apo_read_etuds()

    def apo_read_etuds(self) -> list[ApoEtudTuple]:
        """Lecture des étudiants (et résultats) du fichier CSV Apogée.
        Les lignes "étudiant" commencent toujours par
        `12345678	NOM	PRENOM	 15/05/2003`
        le premier code étant le NIP.
        etud_tuples = []
        while True:
            line = self._file.readline()
            # cette section est impérativement la dernière du fichier
            # donc on arrête ici:
            if not line:
            if not line.strip():
                continue  # silently ignore blank lines
            line = line.strip(APO_NEWLINE)
            fields = line.split(APO_SEP)
            if len(fields) < 4:
                raise ScoFormatError(
                    """Ligne étudiant invalide 
                    (doit commencer par 'NIP NOM PRENOM dd/mm/yyyy')""",
            cols = {}  # { col_id : value }
            for i, field in enumerate(fields):
                cols[self.col_ids[i]] = field
                    nip=fields[0],  # id etudiant
                # XXX à remettre dans apogee_csv.py
                #     export_res_etape=self.export_res_etape,
                #     export_res_sem=self.export_res_sem,
                #     export_res_ues=self.export_res_ues,
                #     export_res_modules=self.export_res_modules,
                #     export_res_sdj=self.export_res_sdj,
                #     export_res_rat=self.export_res_rat,
                # )

        return etud_tuples

    def _apo_read_titres(self, f) -> dict:
        "Lecture section TITRES du fichier Apogée, renvoie dict"
        d = {}
        while True:
            line = f.readline().strip(
                " " + APO_NEWLINE
            )  # ne retire pas le \t (pour les clés vides)
            if not line.strip():  # stoppe sur ligne pleines de \t

            fields = line.split(APO_SEP)
            if len(fields) == 2:
                k, v = fields
                log(f"Error read CSV: \nline={line}\nfields={fields}")
                raise ScoFormatError(
                    f"Fichier Apogee incorrect (section titres, {len(fields)} champs au lieu de 2)",
            d[k] = v
        if not d.get("apoC_Fichier_Exp", None):
            raise ScoFormatError(
                "Fichier Apogee incorrect: pas de titre apoC_Fichier_Exp",
        # keep only basename: may be a windows or unix pathname
        s = d["apoC_Fichier_Exp"].split("/")[-1]
        s = s.split("\\")[-1]  # for DOS paths, eg C:\TEMP\VL4RT_V3ASR.TXT
        d["apoC_Fichier_Exp"] = s
        return d

    def get_filename(self) -> str:
        """Le nom du fichier APogée, tel qu'indiqué dans le fichier
        ou vide."""
        if self.titles:
            return self.titles.get("apoC_Fichier_Exp", "")
        return ""

    def write(self, apo_etuds: list["ApoEtud"]) -> bytes:
        """Renvoie le contenu actualisé du fichier Apogée"""
        f = io.StringIO()
        self._write_etuds(f, apo_etuds)
        return f.getvalue().encode(APO_OUTPUT_ENCODING)

    def _write_etuds(self, f, apo_etuds: list["ApoEtud"]):
        """write apo CSV etuds on f"""
        for apo_etud in apo_etuds:
            fields = []  #  e['nip'], e['nom'], e['prenom'], e['naissance'] ]
            for col_id in self.col_ids:
                except KeyError:
                        f"""Error: {apo_etud["nip"]} {apo_etud["nom"]} missing column key {col_id}
Details:\napo_etud = {pprint.pformat(apo_etud)}
étudiant ignoré.
            f.write(APO_SEP.join(fields) + APO_NEWLINE)

    def _write_header(self, f):
        """write apo CSV header on f
        (beginning of CSV until columns titles just after XX-APO_VALEURS-XX line)
        remove_typ_res = sco_preferences.get_preference("export_res_remove_typ_res")
        for section, data in self.sections_str.items():
            # ne recopie pas la section résultats, et en option supprime APO_TYP_RES
            if (section != "XX-APO_VALEURS-XX") and (
                section != "XX-APO_TYP_RES-XX" or not remove_typ_res

        f.write("XX-APO_VALEURS-XX" + APO_NEWLINE)

class ApoElt:
    """Définition d'un Element Apogée
    sur plusieurs colonnes du fichier CSV

    def __init__(self, cols):
        assert len(cols) > 0
        assert len(set([c["Code"] for c in cols])) == 1  # colonnes de meme code
        assert len(set([c["Type Objet"] for c in cols])) == 1  # colonnes de meme type
        self.cols = cols
        self.code = cols[0]["Code"]
        self.version = cols[0]["Version"]
        self.type_objet = cols[0]["Type Objet"]

    def append(self, col):
        """ajoute une "colonne" à l'élément"""
        assert col["Code"] == self.code
        if col["Type Objet"] != self.type_objet:
                f"""Warning: ApoElt: duplicate id {
                        self.code} ({self.type_objet} and {col["Type Objet"]})"""
            self.type_objet = col["Type Objet"]

    def __repr__(self):
        return f"ApoElt(code='{self.code}', cols={pprint.pformat(self.cols)})"

def guess_data_encoding(text: bytes, threshold=0.6):
    """Guess string encoding, using chardet heuristics.
    Returns encoding, or None if detection failed (confidence below threshold)
    r = chardet_detect(text)
    if r["confidence"] < threshold:
        return None
        return r["encoding"]

def fix_data_encoding(
    text: bytes,
) -> tuple[bytes, str]:
    """Try to ensure that text is using dest_encoding
    returns converted text, and a message describing the conversion.

    Raises UnicodeEncodeError en cas de problème, en général liée à
    une auto-détection errornée.
    message = ""
    detected_encoding = guess_data_encoding(text)
    if not detected_encoding:
        if default_source_encoding != dest_encoding:
            message = f"converting from {default_source_encoding} to {dest_encoding}"
            text = text.decode(default_source_encoding).encode(dest_encoding)
        if detected_encoding != dest_encoding:
            message = (
                f"converting from detected {default_source_encoding} to {dest_encoding}"
            text = text.decode(detected_encoding).encode(dest_encoding)
    return text, message

def _apo_read_typ_res(f) -> str:
    "Lit la section XX-APO_TYP_RES-XX"
    while True:
        line = f.readline()
        stripped_line = line.strip()
        if not stripped_line:
        text += line
    return text

def _apo_next_non_blank_line(f: StringIOWithLineNumber) -> tuple[str, int]:
    "Ramène prochaine ligne non blanche, stripped, et l'indice de son début"
    while True:
        pos = f.tell()
        line = f.readline()
        if not line:
            return "", -1
        stripped_line = line.strip()
        if stripped_line:
            return stripped_line, pos