format description (in tools/) FORMAT_FILE = "format_import_etudiants.txt" # Champs modifiables via "Import données admission" # (nom/prénom modifiables en mode "avec etudid") ADMISSION_MODIFIABLE_FIELDS = ( "code_nip", "code_ine", "prenom_etat_civil", "civilite_etat_civil", "date_naissance", "lieu_naissance", "bac", "specialite", "annee_bac", "math", "physique", "anglais", "francais", "type_admission", "boursier_prec", "qualite", "rapporteur", "score", "commentaire", "classement", "apb_groupe", "apb_classement_gr", "nomlycee", "villelycee", "codepostallycee", "codelycee", # Adresse: "email", "emailperso", "domicile", "codepostaldomicile", "villedomicile", "paysdomicile", "telephone", "telephonemobile", # Groupes "groupes", ) # ---- def sco_import_format(with_codesemestre=True): "returns tuples (Attribut, Type, Table, AllowNulls, Description)" r = [] for l in open(os.path.join(scu.SCO_TOOLS_DIR, FORMAT_FILE)): l = l.strip() if l and l[0] != "#": fs = l.split(";") if len(fs) < 5: # Bug: invalid format file (fatal) raise ScoException( "file %s has invalid format (expected %d fields, got %d) (%s)" % (FORMAT_FILE, 5, len(fs), l) ) fieldname = ( fs[0].strip().lower().split()[0] ) # titre attribut: normalize, 1er mot seulement (nom du champ en BD) typ, table, allow_nulls, description = [x.strip() for x in fs[1:5]] aliases = [x.strip() for x in fs[5:] if x.strip()] if fieldname not in aliases: aliases.insert(0, fieldname) # prepend if with_codesemestre or fs[0] != "codesemestre": r.append( ( fieldname, typ, table, scu.to_bool(allow_nulls), description, aliases, ) ) return r def sco_import_format_dict(with_codesemestre=True, use_etudid: bool = False): """Attribut: { 'type': , 'table', 'allow_nulls' , 'description' }""" fmt = sco_import_format(with_codesemestre=with_codesemestre) formats = {} for l in fmt: formats[l[0]] = { "type": l[1], "table": l[2], "allow_nulls": l[3], "description": l[4], "aliases": l[5], } if use_etudid: formats["etudid"] = { "type": "int", "table": "identite", "allow_nulls": False, "description": "", "aliases": ["etudid", "id"], } return formats def sco_import_generate_excel_sample( fmt, with_codesemestre=True, only_tables=None, with_groups=True, exclude_cols=(), extra_cols=(), group_ids=(), ): """Generates an excel document based on format fmt (format is the result of sco_import_format()) If not None, only_tables can specify a list of sql table names (only columns from these tables will be generated) If group_ids, liste les etudiants de ces groupes """ style = sco_excel.excel_make_style(bold=True) style_required = sco_excel.excel_make_style(bold=True, color=COLORS.RED) titles = [] titles_styles = [] for l in fmt: name = l[0].lower() if (not with_codesemestre) and name == "codesemestre": continue # pas de colonne codesemestre if only_tables is not None and l[2].lower() not in only_tables: continue # table non demandée if name in exclude_cols: continue # colonne exclue if int(l[3]): titles_styles.append(style) else: titles_styles.append(style_required) titles.append(name) if with_groups and "groupes" not in titles: titles.append("groupes") titles_styles.append(style) titles += extra_cols titles_styles += [style] * len(extra_cols) if group_ids: groups_infos = sco_groups_view.DisplayedGroupsInfos(group_ids) members = groups_infos.members log( f"sco_import_generate_excel_sample: group_ids={group_ids}, {len(members)} members" ) titles = ["etudid"] + titles titles_styles = [style] + titles_styles # rempli table avec données actuelles lines = [] for i in members: etud = sco_etud.get_etud_info(etudid=i["etudid"], filled=True)[0] l = [] for field in titles: if field == "groupes": sco_groups.etud_add_group_infos( etud, groups_infos.formsemestre_id, sep=";", with_default_partition=False, ) l.append(etud["partitionsgroupes"]) else: key = field.lower().split()[0] l.append(etud.get(key, "")) lines.append(l) else: lines = [[]] # empty content, titles only return sco_excel.excel_simple_table( titles=titles, titles_styles=titles_styles, sheet_name="Étudiants", lines=lines ) def students_import_excel( csvfile, formsemestre_id=None, check_homonyms=True, require_ine=False, return_html=True, ) -> str: "import students from Excel file" diag = scolars_import_excel_file( csvfile, formsemestre_id=formsemestre_id, check_homonyms=check_homonyms, require_ine=require_ine, exclude_cols=["photo_filename"], ) if return_html: dest_url = ( url_for( "notes.formsemestre_status", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ) if formsemestre_id else url_for("notes.index_html", scodoc_dept=g.scodoc_dept) ) return render_template( "sco_page.j2", title="Import etudiants", content=f""" <h2>Import etudiants</h2> <p>Import terminé !</p> <ul> {''.join('<li>' + d + '</li>' for d in diag)} </ul> <p><a class="stdlink" href="{dest_url}">Continuer</a></p> """, ) return "" def scolars_import_excel_file( datafile: io.BytesIO, formsemestre_id=None, check_homonyms=True, require_ine=False, exclude_cols=(), ): """Importe etudiants depuis fichier Excel et les inscrit dans le semestre indiqué (et à TOUS ses modules) """ log(f"scolars_import_excel_file: {formsemestre_id}") cnx = ndb.GetDBConnexion() cursor = cnx.cursor(cursor_factory=ndb.ScoDocCursor) annee_courante = time.localtime()[0] always_require_ine = sco_preferences.get_preference("always_require_ine") exceldata = datafile.read() if not exceldata: raise ScoValueError("Ficher excel vide ou invalide") diag, data = sco_excel.excel_bytes_to_list(exceldata) if not data: # probably a bug raise ScoException("scolars_import_excel_file: empty file !") formsemestre_to_invalidate = set() # 1- --- check title line titles = {} fmt = sco_import_format() for l in fmt: tit = l[0].lower().split()[0] # titles in lowercase, and take 1st word if ( (not formsemestre_id) or (tit != "codesemestre") ) and tit not in exclude_cols: titles[tit] = l[1:] # title : (Type, Table, AllowNulls, Description) # remove quotes, downcase and keep only 1st word try: fs = [scu.stripquotes(s).lower().split()[0] for s in data[0]] except Exception as exc: raise ScoValueError("Titres de colonnes invalides (ou vides ?)") from exc # check columns titles if len(fs) != len(titles): missing = {}.fromkeys(list(titles.keys())) unknown = [] for f in fs: if f in missing: del missing[f] else: unknown.append(f) raise ScoValueError( """Nombre de colonnes incorrect (devrait être %d, et non %d)<br> (colonnes manquantes: %s, colonnes invalides: %s)""" % (len(titles), len(fs), list(missing.keys()), unknown) ) titleslist = [] for t in fs: if t not in titles: raise ScoValueError(f'Colonne invalide: "{t}"') titleslist.append(t) # # ok, same titles # Start inserting data, abort whole transaction in case of error created_etudids = [] np_imported_homonyms = 0 group_id_inferer = {} try: # --- begin DB transaction linenum = 0 for line in data[1:]: linenum += 1 # Read fields, check and convert type values = {} fs = line # remove quotes for i, field in enumerate(fs): if field and ( (field[0] == '"' and field[-1] == '"') or (field[0] == "'" and field[-1] == "'") ): fs[i] = field[1:-1] for i, field in enumerate(fs): val = field.strip() typ, table, allow_nulls, descr, aliases = tuple(titles[titleslist[i]]) if not val and not allow_nulls: raise ScoValueError( f"line {linenum}: null value not allowed in column {titleslist[i]}" ) if val == "": val = None else: if typ == "real": val = val.replace(",", ".") # si virgule a la française try: val = float(val) except (ValueError, TypeError) as exc: raise ScoValueError( f"""valeur nombre reel invalide ({ val}) sur ligne {linenum}, colonne {titleslist[i]}""" ) from exc elif typ == "integer": try: # on doit accepter des valeurs comme "2006.0" val = val.replace(",", ".") # si virgule a la française val = float(val) if val % 1.0 > 1e-4: raise ValueError() val = int(val) except (ValueError, TypeError) as exc: raise ScoValueError( f"""valeur nombre entier invalide ({ val}) sur ligne {linenum}, colonne {titleslist[i]}""" ) from exc # Ad-hoc checks (should be in format description) if titleslist[i].lower() == "civilite": try: val = input_civilite(val) except ScoValueError as exc: raise ScoValueError( f"""valeur invalide pour 'civilite' (doit etre 'M', 'F', ou 'MME', 'H', 'X' mais pas '{ val}') ligne {linenum}, colonne {titleslist[i]}""" ) from exc if titleslist[i].lower() == "civilite_etat_civil": try: val = input_civilite_etat_civil(val) except ScoValueError as exc: raise ScoValueError( f"""valeur invalide pour 'civilite' (doit etre 'M', 'F', vide ou 'MME', 'H', 'X' mais pas '{ val}') ligne {linenum}, colonne {titleslist[i]}""" ) from exc # Excel date conversion: if titleslist[i].lower() == "date_naissance": if val: try: val = sco_excel.xldate_as_datetime(val) except ValueError as exc: raise ScoValueError( f"""date invalide ({val}) sur ligne { linenum}, colonne {titleslist[i]}""" ) from exc # INE if ( titleslist[i].lower() == "code_ine" and always_require_ine and not val ): raise ScoValueError( "Code INE manquant sur ligne {linenum}, colonne {titleslist[i]}" ) # -- values[titleslist[i]] = val skip = False is_new_ine = values["code_ine"] and _is_new_ine(cnx, values["code_ine"]) if require_ine and not is_new_ine: log("skipping %s (code_ine=%s)" % (values["nom"], values["code_ine"])) skip = True if not skip: if values["code_ine"] and not is_new_ine: raise ScoValueError("Code INE dupliqué (%s)" % values["code_ine"]) # Check nom/prenom ok = False homonyms = [] if "nom" in values and "prenom" in values: ok, homonyms = sco_etud.check_nom_prenom_homonyms( nom=values["nom"], prenom=values["prenom"] ) if not ok: raise ScoValueError( f"nom ou prénom invalide sur la ligne {linenum}" ) if homonyms: np_imported_homonyms += 1 # Insert in DB tables _import_one_student( formsemestre_id, values, group_id_inferer, annee_courante, created_etudids, linenum, ) # Verification proportion d'homonymes: si > 10%, abandonne log(f"scolars_import_excel_file: detected {np_imported_homonyms} homonyms") if check_homonyms and np_imported_homonyms > len(created_etudids) / 10: log("scolars_import_excel_file: too many homonyms") raise ScoValueError( f"Il y a trop d'homonymes ({np_imported_homonyms} étudiants)" ) except: cnx.rollback() log("scolars_import_excel_file: aborting transaction !") # Nota: db transaction is sometimes partly commited... # here we try to remove all created students cursor = cnx.cursor(cursor_factory=ndb.ScoDocCursor) for etudid in created_etudids: log(f"scolars_import_excel_file: deleting etudid={etudid}") cursor.execute( "delete from notes_moduleimpl_inscription where etudid=%(etudid)s", {"etudid": etudid}, ) cursor.execute( "delete from notes_formsemestre_inscription where etudid=%(etudid)s", {"etudid": etudid}, ) cursor.execute( "delete from scolar_events where etudid=%(etudid)s", {"etudid": etudid} ) cursor.execute( "delete from adresse where etudid=%(etudid)s", {"etudid": etudid} ) cursor.execute( "delete from group_membership where etudid=%(etudid)s", {"etudid": etudid}, ) cursor.execute( "delete from identite where id=%(etudid)s", {"etudid": etudid} ) cnx.commit() log("scolars_import_excel_file: re-raising exception") raise diag.append(f"Import et inscription de {len(created_etudids)} étudiants") ScolarNews.add( typ=ScolarNews.NEWS_INSCR, text=f"Inscription de {len(created_etudids)} étudiants", # peuvent avoir ete inscrits a des semestres differents obj=formsemestre_id, max_frequency=0, ) log("scolars_import_excel_file: completing transaction") cnx.commit() # Invalide les caches des semestres dans lesquels on a inscrit des etudiants: for fid in formsemestre_to_invalidate: sco_cache.invalidate_formsemestre(formsemestre_id=fid) return diag def students_import_admission( csvfile, type_admission="", formsemestre_id=None, return_html=True, use_etudid=False ) -> str: "import donnees admission from Excel file (v2016)" diag = scolars_import_admission( csvfile, formsemestre_id=formsemestre_id, type_admission=type_admission, use_etudid=use_etudid, ) if return_html: H = ["<p>Import terminé !</p>"] H.append( f"""<p><a class="stdlink" href="{ url_for( "notes.formsemestre_status", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ) }">Continuer</a></p>""" ) if diag: H.append( f"""<p>Diagnostic: <ul><li>{ "</li><li>".join(diag) }</li></ul></p> """ ) return render_template( "sco_page.j2", title="Import données admissions", content="\n".join(H) ) return "" def _import_one_student( formsemestre_id, values, GroupIdInferers, annee_courante, created_etudids, linenum, ) -> int: """ Import d'un étudiant et inscription dans le semestre. Return: id du semestre dans lequel il a été inscrit. """ log(f"scolars_import_excel_file: formsemestre_id={formsemestre_id} values={values}") # Identite args = values.copy() args["annee"] = annee_courante etud: Identite = Identite.create_from_dict(args) etud.admission.from_dict(args) etudid = etud.id created_etudids.append(etudid) # Adresse args["typeadresse"] = "domicile" args["description"] = "(infos admission)" adresse = etud.adresses.first() adresse.from_dict(args) db.session.add(etud) db.session.commit() # Inscription au semestre args["etat"] = scu.INSCRIT # etat insc. semestre if formsemestre_id: args["formsemestre_id"] = formsemestre_id else: args["formsemestre_id"] = values["codesemestre"] formsemestre_id = values["codesemestre"] try: formsemestre_id = int(formsemestre_id) except (ValueError, TypeError) as exc: raise ScoValueError( f"valeur invalide ou manquante dans la colonne codesemestre, ligne {linenum+1}" ) from exc # recupere liste des groupes: if formsemestre_id not in GroupIdInferers: GroupIdInferers[formsemestre_id] = sco_groups.GroupIdInferer(formsemestre_id) gi = GroupIdInferers[formsemestre_id] if args["groupes"]: groupes = args["groupes"].split(";") else: groupes = [] group_ids = [gi[group_name] for group_name in groupes] group_ids = list({}.fromkeys(group_ids).keys()) # uniq if None in group_ids: raise ScoValueError( f"groupe invalide sur la ligne {linenum} (groupe {groupes})" ) do_formsemestre_inscription_with_modules( int(args["formsemestre_id"]), etudid, group_ids, etat=scu.INSCRIT, method="import_csv_file", ) return args["formsemestre_id"] def _is_new_ine(cnx, code_ine): "True if this code is not in DB" etuds = sco_etud.identite_list(cnx, {"code_ine": code_ine}) return not etuds # ------ Fonction ré-écrite en nov 2016 pour lire des fichiers sans etudid (fichiers APB) def scolars_import_admission( datafile, formsemestre_id=None, type_admission=None, use_etudid=False ): """Importe données admission depuis un fichier Excel quelconque par exemple ceux utilisés avec APB, avec ou sans etudid Cherche dans ce fichier les étudiants qui correspondent à des inscrits du semestre formsemestre_id. Si le fichier n'a pas d'etudid (use_etudid faux), la correspondance se fait via les noms/prénoms qui doivent être égaux (la casse, les accents et caractères spéciaux étant ignorés). On tolère plusieurs variantes pour chaque nom de colonne (ici aussi, la casse, les espaces et les caractères spéciaux sont ignorés. Ainsi, la colonne "Prénom:" sera considéré comme "prenom". Le parametre type_admission remplace les valeurs vides (dans la base ET dans le fichier importé) du champ type_admission. Si une valeur existe ou est présente dans le fichier importé, ce paramètre est ignoré. """ log(f"scolars_import_admission: formsemestre_id={formsemestre_id}") diag: list[str] = [] members = sco_groups.get_group_members( sco_groups.get_default_group(formsemestre_id) ) etuds_by_nomprenom = {} # { nomprenom : etud } etuds_by_etudid = {} # { etudid : etud } if use_etudid: etuds_by_etudid = {m["etudid"]: m for m in members} else: for m in members: np = (adm_normalize_string(m["nom"]), adm_normalize_string(m["prenom"])) if np in etuds_by_nomprenom: msg = f"""Attention: hononymie pour {m["nom"]} {m["prenom"]}""" log(msg) diag.append(msg) etuds_by_nomprenom[np] = m exceldata = datafile.read() diag2, data = sco_excel.excel_bytes_to_list(exceldata) if not data: raise ScoException("scolars_import_admission: empty file !") diag += diag2 cnx = ndb.GetDBConnexion() titles = data[0] # idx -> ('field', convertor) fields = adm_get_fields(titles, formsemestre_id, use_etudid=use_etudid) idx_nom = idx_prenom = idx_etudid = None for idx, field in fields.items(): match field[0]: case "nom": idx_nom = idx case "prenom": idx_prenom = idx case "etudid": idx_etudid = idx if (not use_etudid and ((idx_nom is None) or (idx_prenom is None))) or ( use_etudid and idx_etudid is None ): log("fields indices=" + ", ".join([str(x) for x in fields])) log("fields titles =" + ", ".join([x[0] for x in fields.values()])) raise ScoFormatError( ( """colonne etudid requise (si l'option "Utiliser l'identifiant d'étudiant ScoDoc" est cochée)""" if use_etudid else "colonnes nom et prenom requises" ), dest_url=url_for( "scolar.form_import_infos_admissions", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ), ) modifiable_fields = set(ADMISSION_MODIFIABLE_FIELDS) if use_etudid: modifiable_fields |= {"nom", "prenom"} nline = 2 # la premiere ligne de donnees du fichier excel est 2 n_import = 0 for line in data[1:]: if use_etudid: try: etud = etuds_by_etudid.get(int(line[idx_etudid])) except ValueError: etud = None if not etud: msg = f"""Étudiant avec code etudid=<b>{line[idx_etudid]}</b> inexistant""" diag.append(msg) else: # Retrouve l'étudiant parmi ceux du semestre par (nom, prenom) nom = adm_normalize_string(line[idx_nom]) prenom = adm_normalize_string(line[idx_prenom]) etud = etuds_by_nomprenom.get((nom, prenom)) if not etud: msg = ( f"""Étudiant <b>{line[idx_nom]} {line[idx_prenom]}</b> inexistant""" ) diag.append(msg) if etud: cur_adm = sco_etud.admission_list(cnx, args={"id": etud["admission_id"]})[0] # peuple les champs presents dans le tableau args = {} for idx, field in fields.items(): field_name, convertor = field if field_name in modifiable_fields: try: val = convertor(line[idx]) except ValueError as exc: raise ScoFormatError( f"""scolars_import_admission: valeur invalide, ligne { nline} colonne {field_name}: '{line[idx]}'""", dest_url=url_for( "scolar.form_import_infos_admissions", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ), ) from exc if val is not None: # note: ne peut jamais supprimer une valeur args[field_name] = val if args: args["etudid"] = etud["etudid"] args["adm_id"] = cur_adm["adm_id"] # Type admission: traitement particulier if not cur_adm["type_admission"] and not args.get("type_admission"): args["type_admission"] = type_admission sco_etud.etudident_edit( # TODO utiliser modèle cnx, args, disable_notify=True ) adr = sco_etud.adresse_list(cnx, args={"etudid": etud["etudid"]}) if adr: args["adresse_id"] = adr[0]["adresse_id"] sco_etud.adresse_edit( # TODO utiliser modèle cnx, args, disable_notify=True ) # pas de notification ici else: args["typeadresse"] = "domicile" args["description"] = "(infos admission)" adresse_id = sco_etud.adresse_create( # TODO utiliser modèle cnx, args ) # log('import_adm: %s' % args ) # Change les groupes si nécessaire: if "groupes" in args: gi = sco_groups.GroupIdInferer(formsemestre_id) groupes = args["groupes"].split(";") group_ids = [gi[group_name] for group_name in groupes] group_ids = list({}.fromkeys(group_ids).keys()) # uniq if None in group_ids: raise ScoValueError( f"groupe invalide sur la ligne {nline} (groupes {groupes})" ) for group_id in group_ids: group: GroupDescr = GroupDescr.get_instance(group_id) if group.partition.groups_editable: sco_groups.change_etud_group_in_partition( args["etudid"], group ) elif not group.partition.is_parcours: log("scolars_import_admission: partition non editable") diag.append( f"""Attention: partition { group.partition} (g{group.id}) non editable et ignorée""" ) # diag.append(f"import de {etud['nomprenom']}") n_import += 1 nline += 1 diag.append(f"{n_import} lignes importées") if n_import > 0: sco_cache.invalidate_formsemestre(formsemestre_id=formsemestre_id) return diag _ADM_PATTERN = re.compile(r"[\W]+", re.UNICODE) # supprime tout sauf alphanum def adm_normalize_string(s): "normalize unicode title" return scu.suppress_accents(_ADM_PATTERN.sub("", s.strip().lower())).replace( "_", "" ) def adm_get_fields(titles, formsemestre_id: int, use_etudid: bool = False): """Cherche les colonnes importables dans les titres (ligne 1) du fichier excel return: { idx : (field_name, convertor) } """ format_dict = sco_import_format_dict(use_etudid=use_etudid) fields = {} idx = 0 for title in titles: title_n = adm_normalize_string(title) for k, fmt in format_dict.items(): for v in fmt["aliases"]: if adm_normalize_string(v) == title_n: typ = fmt["type"] if typ == "real": convertor = adm_convert_real elif typ == "integer" or typ == "int": convertor = adm_convert_int else: convertor = adm_convert_text # doublons ? if k in [x[0] for x in fields.values()]: raise ScoFormatError( f"""scolars_import_admission: titre "{title}" en double (ligne 1)""", dest_url=url_for( "scolar.form_import_infos_admissions", scodoc_dept=g.scodoc_dept, formsemestre_id=formsemestre_id, ), ) fields[idx] = (k, convertor) idx += 1 return fields def adm_convert_text(v): if isinstance(v, float): return "{:g}".format(v) # evite "1.0" return v def adm_convert_int(v): if type(v) != int and not v: return None return int(float(v)) # accept "10.0" def adm_convert_real(v): if type(v) != float and not v: return None return float(v) def adm_table_description_format(): """Table HTML (ou autre format) decrivant les donnees d'admissions importables""" Fmt = sco_import_format_dict(with_codesemestre=False) for k in Fmt: Fmt[k]["attribute"] = k Fmt[k]["aliases_str"] = ", ".join(Fmt[k]["aliases"]) if not Fmt[k]["allow_nulls"]: Fmt[k]["required"] = "*" if k in ADMISSION_MODIFIABLE_FIELDS: Fmt[k]["writable"] = "oui" else: Fmt[k]["writable"] = "non" titles = { "attribute": "Attribut", "type": "Type", "required": "Requis", "writable": "Modifiable", "description": "Description", "aliases_str": "Titres (variantes)", } columns_ids = ("attribute", "type", "writable", "description", "aliases_str") tab = GenTable( columns_ids=columns_ids, html_class="table_leftalign", html_sortable=True, preferences=sco_preferences.SemPreferences(), rows=list(Fmt.values()), table_id="adm_table_description_format", titles=titles, ) return tab