# -*- mode: python -*- # -*- coding: utf-8 -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """Liaison avec le portail ENT (qui donne accès aux infos Apogée) """ import datetime import os import time import urllib import xml import xml.sax.saxutils import xml.dom.minidom from flask import flash import app.scodoc.sco_utils as scu from app import log from app.scodoc import sco_cache from app.scodoc.sco_exceptions import ScoValueError from app.scodoc import sco_preferences SCO_CACHE_ETAPE_FILENAME = os.path.join(scu.SCO_TMP_DIR, "last_etapes.xml") class ApoInscritsEtapeCache(sco_cache.ScoDocCache): """Cache liste des inscrits à une étape Apogée""" timeout = 10 * 60 # 10 minutes prefix = "APOINSCRETAP" def has_portal(): "True if we are connected to a portal" return get_portal_url() class PortalInterface(object): def __init__(self): self.first_time = True def get_portal_url(self): "URL of portal" portal_url = sco_preferences.get_preference("portal_url") if portal_url and not portal_url.endswith("/"): portal_url += "/" if self.first_time: if portal_url: log(f"Portal URL={portal_url}") else: log("Portal not configured") self.first_time = False return portal_url def get_etapes_url(self): "Full URL of service giving list of etapes (in XML)" etapes_url = sco_preferences.get_preference("etapes_url") if not etapes_url: # Default: portal_url = self.get_portal_url() if not portal_url: return None api_ver = self.get_portal_api_version() if api_ver > 1: etapes_url = portal_url + "scodocEtapes.php" else: etapes_url = portal_url + "getEtapes.php" return etapes_url def get_etud_url(self): "Full URL of service giving list of students (in XML)" etud_url = sco_preferences.get_preference("etud_url") if not etud_url: # Default: portal_url = self.get_portal_url() if not portal_url: return None api_ver = self.get_portal_api_version() if api_ver > 1: etud_url = portal_url + "scodocEtudiant.php" else: etud_url = portal_url + "getEtud.php" return etud_url def get_photo_url(self): "Full URL of service giving photo of student" photo_url = sco_preferences.get_preference("photo_url") if not photo_url: # Default: portal_url = self.get_portal_url() if not portal_url: return None api_ver = self.get_portal_api_version() if api_ver > 1: photo_url = portal_url + "scodocPhoto.php" else: photo_url = portal_url + "getPhoto.php" return photo_url def get_maquette_url(self): """Full URL of service giving Apogee maquette pour une étape (fichier "CSV")""" maquette_url = sco_preferences.get_preference("maquette_url") if not maquette_url: # Default: portal_url = self.get_portal_url() if not portal_url: return None maquette_url = portal_url + "scodocMaquette.php" return maquette_url def get_portal_api_version(self): "API version of the portal software" api_ver = sco_preferences.get_preference("portal_api") if not api_ver: # Default: api_ver = 1 return api_ver _PI = PortalInterface() get_portal_url = _PI.get_portal_url get_etapes_url = _PI.get_etapes_url get_etud_url = _PI.get_etud_url get_photo_url = _PI.get_photo_url get_maquette_url = _PI.get_maquette_url get_portal_api_version = _PI.get_portal_api_version def get_inscrits_etape( code_etape, annee_apogee=None, ntrials=4, use_cache=True ) -> list[dict]: """Liste des inscrits à une étape Apogée Result = list of dicts ntrials: try several time the same request, useful for some bad web services use_cache: use (redis) cache """ log(f"get_inscrits_etape: code={code_etape} annee_apogee={annee_apogee}") if annee_apogee is None: annee_apogee = str(time.localtime()[0]) if use_cache: obj = ApoInscritsEtapeCache.get((code_etape, annee_apogee)) if obj: log("get_inscrits_etape: using cached data") return obj etud_url = get_etud_url() api_ver = get_portal_api_version() if not etud_url: return [] portal_timeout = sco_preferences.get_preference("portal_timeout") if api_ver > 1: req = ( etud_url + "?" + urllib.parse.urlencode((("etape", code_etape), ("annee", annee_apogee))) ) else: req = etud_url + "?" + urllib.parse.urlencode((("etape", code_etape),)) actual_timeout = float(portal_timeout) / ntrials if portal_timeout > 0: actual_timeout = max(1, actual_timeout) for _ntrial in range(ntrials): doc = scu.query_portal(req, timeout=actual_timeout) if doc: break if not doc: raise ScoValueError( f"pas de réponse du portail ! <br>(timeout={portal_timeout}, requête: <tt>{req}</tt>)" ) etuds = _normalize_apo_fields(xml_to_list_of_dicts(doc, req=req)) # Filtre sur annee inscription Apogee: def check_inscription(e): if "inscription" in e: if e["inscription"] == annee_apogee: return True else: return False else: log( "get_inscrits_etape: pas inscription dans code_etape=%s e=%s" % (code_etape, e) ) return False # ??? pas d'annee d'inscription dans la réponse etuds = [e for e in etuds if check_inscription(e)] if use_cache and etuds: ApoInscritsEtapeCache.set((code_etape, annee_apogee), etuds) return etuds def query_apogee_portal(**args): """Recupere les infos sur les etudiants nommés args: nom, prenom, code_nip (nom et prenom matchent des parties de noms) """ etud_url = get_etud_url() api_ver = get_portal_api_version() if not etud_url: return [] if api_ver > 1: if args["nom"] or args["prenom"]: # Ne fonctionne pas avec l'API 2 sur nom et prenom # XXX TODO : va poser problème pour la page modif données étudiants : A VOIR return [] portal_timeout = sco_preferences.get_preference("portal_timeout") req = etud_url + "?" + urllib.parse.urlencode(list(args.items())) doc = scu.query_portal(req, timeout=portal_timeout) # sco_utils return xml_to_list_of_dicts(doc, req=req) def xml_to_list_of_dicts(doc, req=None): """Convert an XML 1.0 str to a list of dicts.""" if not doc: return [] # Fix for buggy XML returned by some APIs (eg USPN) invalid_entities = { "&CCEDIL;": "Ç", "& ": "& ", # only when followed by a space (avoid affecting entities) # to be completed... } for k in invalid_entities: doc = doc.replace(k, invalid_entities[k]) # try: dom = xml.dom.minidom.parseString(doc) except xml.parsers.expat.ExpatError as e: # Find faulty part err_zone = doc.splitlines()[e.lineno - 1][e.offset : e.offset + 20] # catch bug: log and re-raise exception log( "xml_to_list_of_dicts: exception in XML parseString\ndoc:\n%s\n(end xml doc)\n" % doc ) raise ScoValueError( 'erreur dans la réponse reçue du portail ! (peut être : "%s")' % err_zone ) infos = [] try: if dom.childNodes[0].nodeName != "etudiants": raise ValueError etudiants = dom.getElementsByTagName("etudiant") for etudiant in etudiants: d = {} # recupere toutes les valeurs <valeur>XXX</valeur> for e in etudiant.childNodes: if e.nodeType == e.ELEMENT_NODE: childs = e.childNodes if len(childs): d[str(e.nodeName)] = childs[0].nodeValue infos.append(d) except: log("*** invalid XML response from Etudiant Web Service") log("req=%s" % req) log("doc=%s" % doc) raise ValueError("invalid XML response from Etudiant Web Service\n%s" % doc) return infos def get_infos_apogee_allaccents(nom, prenom): "essai recup infos avec differents codages des accents" if nom: nom_noaccents = scu.suppress_accents(nom) else: nom_noaccents = nom if prenom: prenom_noaccents = scu.suppress_accents(prenom) else: prenom_noaccents = prenom # avec accents infos = query_apogee_portal(nom=nom, prenom=prenom) # sans accents if nom != nom_noaccents or prenom != prenom_noaccents: infos += query_apogee_portal(nom=nom_noaccents, prenom=prenom_noaccents) return infos def get_infos_apogee(nom, prenom): """recupere les codes Apogee en utilisant le web service CRIT""" if (not nom) and (not prenom): return [] # essaie plusieurs codages: tirets, accents infos = get_infos_apogee_allaccents(nom, prenom) nom_st = nom.replace("-", " ") prenom_st = prenom.replace("-", " ") if nom_st != nom or prenom_st != prenom: infos += get_infos_apogee_allaccents(nom_st, prenom_st) # si pas de match et nom ou prenom composé, essaie en coupant if not infos: if nom: nom1 = nom.split()[0] else: nom1 = nom if prenom: prenom1 = prenom.split()[0] else: prenom1 = prenom if nom != nom1 or prenom != prenom1: infos += get_infos_apogee_allaccents(nom1, prenom1) return infos def get_etud_apogee(code_nip): """Informations à partir du code NIP. None si pas d'infos sur cet etudiant. Exception si reponse invalide. """ if not code_nip: return {} etud_url = get_etud_url() if not etud_url: return {} portal_timeout = sco_preferences.get_preference("portal_timeout") req = etud_url + "?" + urllib.parse.urlencode((("nip", code_nip),)) doc = scu.query_portal(req, timeout=portal_timeout) d = _normalize_apo_fields(xml_to_list_of_dicts(doc, req=req)) if not d: return None if len(d) > 1: log(f"get_etud_apogee({code_nip}): {len(d)} etudiants !\n{doc}") flash("Attention: plusieurs étudiants inscrits avec le NIP {code_nip}") # dans ce cas, renvoie le premier étudiant return d[0] def get_default_etapes(): """Liste par défaut, lue du fichier de config""" filename = scu.SCO_TOOLS_DIR + "/default-etapes.txt" log(f"get_default_etapes: reading {filename}") etapes = {} with open(filename, encoding=scu.SCO_ENCODING) as f: for line in f.readlines(): line = line.strip() if line and line[0] != "#": dept, code, intitule = [x.strip() for x in line.split(":")] if dept and code: if dept in etapes: etapes[dept][code] = intitule else: etapes[dept] = {code: intitule} return etapes def _parse_etapes_from_xml(doc): """ may raise exception if invalid xml doc """ xml_etapes_by_dept = sco_preferences.get_preference("xml_etapes_by_dept") # parser XML dom = xml.dom.minidom.parseString(doc) infos = {} if dom.childNodes[0].nodeName != "etapes": raise ValueError("élément 'etapes' attendu") if xml_etapes_by_dept: # Ancien format XML avec des sections par departement: for d in dom.childNodes[0].childNodes: if d.nodeType == d.ELEMENT_NODE: dept = d.nodeName _xml_list_codes(infos, dept, d.childNodes) else: # Toutes les étapes: dept = "" _xml_list_codes(infos, "", dom.childNodes[0].childNodes) return infos def get_etapes_apogee(): """Liste des etapes apogee { departement : { code_etape : intitule } } Demande la liste au portail, ou si échec utilise liste par défaut """ etapes_url = get_etapes_url() infos = {} if etapes_url: portal_timeout = sco_preferences.get_preference("portal_timeout") log( f"""get_etapes_apogee: requesting '{etapes_url}' with timeout={portal_timeout}""" ) doc = scu.query_portal(etapes_url, timeout=portal_timeout) try: infos = _parse_etapes_from_xml(doc) # cache le resultat (utile si le portail repond de façon intermitente) if infos: log("get_etapes_apogee: caching result") with open( SCO_CACHE_ETAPE_FILENAME, "w", encoding=scu.SCO_ENCODING ) as f: f.write(doc) except: log(f"invalid XML response from getEtapes Web Service\n{etapes_url}") # Avons-nous la copie d'une réponse récente ? try: doc = open(SCO_CACHE_ETAPE_FILENAME, encoding=scu.SCO_ENCODING).read() infos = _parse_etapes_from_xml(doc) log(f"using last saved version from {SCO_CACHE_ETAPE_FILENAME}") except: infos = {} else: # Pas de portail: utilise étapes par défaut livrées avec ScoDoc log("get_etapes_apogee: no configured URL (using default file)") infos = get_default_etapes() return infos def _xml_list_codes(target_dict, dept, nodes): for e in nodes: if e.nodeType == e.ELEMENT_NODE: intitule = e.childNodes[0].nodeValue code = e.attributes["code"].value if dept in target_dict: target_dict[dept][code] = intitule else: target_dict[dept] = {code: intitule} def get_etapes_apogee_dept(): """Liste des etapes apogee pour ce departement. Utilise la propriete 'portal_dept_name' pour identifier le departement. Si xml_etapes_by_dept est faux (nouveau format XML depuis sept 2014), le departement n'est pas utilisé: toutes les étapes sont présentées. Returns [ ( code, intitule) ], ordonnée """ xml_etapes_by_dept = sco_preferences.get_preference("xml_etapes_by_dept") if xml_etapes_by_dept: portal_dept_name = sco_preferences.get_preference("portal_dept_name") log('get_etapes_apogee_dept: portal_dept_name="%s"' % portal_dept_name) else: portal_dept_name = "" log("get_etapes_apogee_dept: pas de sections par departement") infos = get_etapes_apogee() if portal_dept_name and portal_dept_name not in infos: log( "get_etapes_apogee_dept: pas de section '%s' dans la reponse portail" % portal_dept_name ) return [] if portal_dept_name: etapes = list(infos[portal_dept_name].items()) else: # prend toutes les etapes etapes = [] for k in infos.keys(): etapes += list(infos[k].items()) etapes.sort() # tri sur le code etape return etapes def _portal_date_dmy2date(s): """date inscription renvoyée sous la forme dd/mm/yy renvoie un objet date, ou None """ s = s.strip() if not s: return None else: d, m, y = [int(x) for x in s.split("/")] # raises ValueError if bad format if y < 100: y += 2000 # 21ème siècle return datetime.date(y, m, d) def _normalize_apo_fields(infolist): """ infolist: liste de dict renvoyés par le portail Apogee recode les champs: paiementinscription (-> booleen), datefinalisationinscription (date) ajoute le champs 'paiementinscription_str' : 'ok', 'Non' ou '?' ajoute les champs 'etape' (= None) et 'prenom' ('') s'ils ne sont pas présents. ajoute le champ 'civilite_etat_civil' (=''), et 'prenom_etat_civil' (='') si non présent. """ for infos in infolist: if "paiementinscription" in infos: infos["paiementinscription"] = ( infos["paiementinscription"].lower() == "true" ) if infos["paiementinscription"]: infos["paiementinscription_str"] = "ok" else: infos["paiementinscription_str"] = "Non" else: infos["paiementinscription"] = None infos["paiementinscription_str"] = "?" if "datefinalisationinscription" in infos: infos["datefinalisationinscription"] = _portal_date_dmy2date( infos["datefinalisationinscription"] ) infos["datefinalisationinscription_str"] = infos[ "datefinalisationinscription" ].strftime("%d/%m/%Y") else: infos["datefinalisationinscription"] = None infos["datefinalisationinscription_str"] = "" if "etape" not in infos: infos["etape"] = None if "prenom" not in infos: infos["prenom"] = "" if "civilite_etat_civil" not in infos: infos["civilite_etat_civil"] = "" if "prenom_etat_civil" not in infos: infos["prenom_etat_civil"] = "" return infolist def check_paiement_etuds(etuds): """Interroge le portail pour vérifier l'état de "paiement" et l'étape d'inscription. Seuls les etudiants avec code NIP sont renseignés. Renseigne l'attribut booleen 'paiementinscription' dans chaque etud. En sortie: modif les champs de chaque etud 'paiementinscription' : True, False ou None 'paiementinscription_str' : 'ok', 'Non' ou '?' ou '(pas de code)' 'etape' : etape Apogee ou None """ # interrogation séquentielle longue... for etud in etuds: if "code_nip" not in etud: etud["paiementinscription"] = None etud["paiementinscription_str"] = "(pas de code)" etud["datefinalisationinscription"] = None etud["datefinalisationinscription_str"] = "NA" etud["etape"] = None else: # Modifie certains champs de l'étudiant: infos = get_etud_apogee(etud["code_nip"]) if infos: for k in ( "paiementinscription", "paiementinscription_str", "datefinalisationinscription", "datefinalisationinscription_str", "etape", ): etud[k] = infos[k] else: etud["datefinalisationinscription"] = None etud["datefinalisationinscription_str"] = "Erreur" etud["datefinalisationinscription"] = None etud["paiementinscription_str"] = "(pb cnx Apogée)" def get_maquette_apogee(etape="", annee_scolaire="") -> str: """Maquette CSV Apogee pour une étape et une annee scolaire""" maquette_url = get_maquette_url() if not maquette_url: return None portal_timeout = sco_preferences.get_preference("portal_timeout") req = ( maquette_url + "?" + urllib.parse.urlencode((("etape", etape), ("annee", annee_scolaire))) ) doc = scu.query_portal(req, timeout=portal_timeout) return doc