forked from ScoDoc/ScoDoc
837 lines
30 KiB
Python
837 lines
30 KiB
Python
# -*- mode: python -*-
|
|
# -*- coding: utf-8 -*-
|
|
|
|
##############################################################################
|
|
#
|
|
# Gestion scolarite IUT
|
|
#
|
|
# Copyright (c) 1999 - 2023 Emmanuel Viennet. All rights reserved.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
# Emmanuel Viennet emmanuel.viennet@viennet.net
|
|
#
|
|
##############################################################################
|
|
|
|
""" Importation des étudiants à partir de fichiers CSV
|
|
"""
|
|
|
|
import collections
|
|
import io
|
|
import os
|
|
import re
|
|
import time
|
|
|
|
from flask import g, url_for
|
|
|
|
from app import db, log
|
|
from app.models import ScolarNews, GroupDescr
|
|
from app.models.etudiants import input_civilite
|
|
|
|
from app.scodoc.gen_tables import GenTable
|
|
from app.scodoc.sco_excel import COLORS
|
|
from app.scodoc.sco_exceptions import (
|
|
AccessDenied,
|
|
ScoFormatError,
|
|
ScoException,
|
|
ScoValueError,
|
|
ScoInvalidDateError,
|
|
ScoLockedFormError,
|
|
ScoGenError,
|
|
)
|
|
from app.scodoc import html_sco_header
|
|
from app.scodoc import sco_cache
|
|
from app.scodoc import sco_etud
|
|
from app.scodoc import sco_groups
|
|
from app.scodoc import sco_excel
|
|
from app.scodoc import sco_groups_view
|
|
from app.scodoc import sco_preferences
|
|
import app.scodoc.notesdb as ndb
|
|
from app.scodoc.sco_formsemestre_inscriptions import (
|
|
do_formsemestre_inscription_with_modules,
|
|
)
|
|
import app.scodoc.sco_utils as scu
|
|
|
|
# format description (in tools/)
|
|
FORMAT_FILE = "format_import_etudiants.txt"
|
|
|
|
# Champs modifiables via "Import données admission"
|
|
ADMISSION_MODIFIABLE_FIELDS = (
|
|
"code_nip",
|
|
"code_ine",
|
|
"prenom_etat_civil",
|
|
"civilite_etat_civil",
|
|
"date_naissance",
|
|
"lieu_naissance",
|
|
"bac",
|
|
"specialite",
|
|
"annee_bac",
|
|
"math",
|
|
"physique",
|
|
"anglais",
|
|
"francais",
|
|
"type_admission",
|
|
"boursier_prec",
|
|
"qualite",
|
|
"rapporteur",
|
|
"score",
|
|
"commentaire",
|
|
"classement",
|
|
"apb_groupe",
|
|
"apb_classement_gr",
|
|
"nomlycee",
|
|
"villelycee",
|
|
"codepostallycee",
|
|
"codelycee",
|
|
# Adresse:
|
|
"email",
|
|
"emailperso",
|
|
"domicile",
|
|
"codepostaldomicile",
|
|
"villedomicile",
|
|
"paysdomicile",
|
|
"telephone",
|
|
"telephonemobile",
|
|
# Groupes
|
|
"groupes",
|
|
)
|
|
|
|
# ----
|
|
|
|
|
|
def sco_import_format(with_codesemestre=True):
|
|
"returns tuples (Attribut, Type, Table, AllowNulls, Description)"
|
|
r = []
|
|
for l in open(os.path.join(scu.SCO_TOOLS_DIR, FORMAT_FILE)):
|
|
l = l.strip()
|
|
if l and l[0] != "#":
|
|
fs = l.split(";")
|
|
if len(fs) < 5:
|
|
# Bug: invalid format file (fatal)
|
|
raise ScoException(
|
|
"file %s has invalid format (expected %d fields, got %d) (%s)"
|
|
% (FORMAT_FILE, 5, len(fs), l)
|
|
)
|
|
fieldname = (
|
|
fs[0].strip().lower().split()[0]
|
|
) # titre attribut: normalize, 1er mot seulement (nom du champ en BD)
|
|
typ, table, allow_nulls, description = [x.strip() for x in fs[1:5]]
|
|
aliases = [x.strip() for x in fs[5:] if x.strip()]
|
|
if fieldname not in aliases:
|
|
aliases.insert(0, fieldname) # prepend
|
|
if with_codesemestre or fs[0] != "codesemestre":
|
|
r.append((fieldname, typ, table, allow_nulls, description, aliases))
|
|
return r
|
|
|
|
|
|
def sco_import_format_dict(with_codesemestre=True):
|
|
"""Attribut: { 'type': , 'table', 'allow_nulls' , 'description' }"""
|
|
fmt = sco_import_format(with_codesemestre=with_codesemestre)
|
|
R = collections.OrderedDict()
|
|
for l in fmt:
|
|
R[l[0]] = {
|
|
"type": l[1],
|
|
"table": l[2],
|
|
"allow_nulls": l[3],
|
|
"description": l[4],
|
|
"aliases": l[5],
|
|
}
|
|
return R
|
|
|
|
|
|
def sco_import_generate_excel_sample(
|
|
fmt,
|
|
with_codesemestre=True,
|
|
only_tables=None,
|
|
with_groups=True,
|
|
exclude_cols=(),
|
|
extra_cols=(),
|
|
group_ids=(),
|
|
):
|
|
"""Generates an excel document based on format fmt
|
|
(format is the result of sco_import_format())
|
|
If not None, only_tables can specify a list of sql table names
|
|
(only columns from these tables will be generated)
|
|
If group_ids, liste les etudiants de ces groupes
|
|
"""
|
|
style = sco_excel.excel_make_style(bold=True)
|
|
style_required = sco_excel.excel_make_style(bold=True, color=COLORS.RED)
|
|
titles = []
|
|
titles_styles = []
|
|
for l in fmt:
|
|
name = l[0].lower()
|
|
if (not with_codesemestre) and name == "codesemestre":
|
|
continue # pas de colonne codesemestre
|
|
if only_tables is not None and l[2].lower() not in only_tables:
|
|
continue # table non demandée
|
|
if name in exclude_cols:
|
|
continue # colonne exclue
|
|
if int(l[3]):
|
|
titles_styles.append(style)
|
|
else:
|
|
titles_styles.append(style_required)
|
|
titles.append(name)
|
|
if with_groups and "groupes" not in titles:
|
|
titles.append("groupes")
|
|
titles_styles.append(style)
|
|
titles += extra_cols
|
|
titles_styles += [style] * len(extra_cols)
|
|
if group_ids:
|
|
groups_infos = sco_groups_view.DisplayedGroupsInfos(group_ids)
|
|
members = groups_infos.members
|
|
log(
|
|
"sco_import_generate_excel_sample: group_ids=%s %d members"
|
|
% (group_ids, len(members))
|
|
)
|
|
titles = ["etudid"] + titles
|
|
titles_styles = [style] + titles_styles
|
|
# rempli table avec données actuelles
|
|
lines = []
|
|
for i in members:
|
|
etud = sco_etud.get_etud_info(etudid=i["etudid"], filled=True)[0]
|
|
l = []
|
|
for field in titles:
|
|
if field == "groupes":
|
|
sco_groups.etud_add_group_infos(
|
|
etud,
|
|
groups_infos.formsemestre_id,
|
|
sep=";",
|
|
with_default_partition=False,
|
|
)
|
|
l.append(etud["partitionsgroupes"])
|
|
else:
|
|
key = field.lower().split()[0]
|
|
l.append(etud.get(key, ""))
|
|
lines.append(l)
|
|
else:
|
|
lines = [[]] # empty content, titles only
|
|
return sco_excel.excel_simple_table(
|
|
titles=titles, titles_styles=titles_styles, sheet_name="Étudiants", lines=lines
|
|
)
|
|
|
|
|
|
def students_import_excel(
|
|
csvfile,
|
|
formsemestre_id=None,
|
|
check_homonyms=True,
|
|
require_ine=False,
|
|
return_html=True,
|
|
):
|
|
"import students from Excel file"
|
|
diag = scolars_import_excel_file(
|
|
csvfile,
|
|
formsemestre_id=formsemestre_id,
|
|
check_homonyms=check_homonyms,
|
|
require_ine=require_ine,
|
|
exclude_cols=["photo_filename"],
|
|
)
|
|
if return_html:
|
|
if formsemestre_id:
|
|
dest = url_for(
|
|
"notes.formsemestre_status",
|
|
scodoc_dept=g.scodoc_dept,
|
|
formsemestre_id=formsemestre_id,
|
|
)
|
|
else:
|
|
dest = url_for("notes.index_html", scodoc_dept=g.scodoc_dept)
|
|
H = [html_sco_header.sco_header(page_title="Import etudiants")]
|
|
H.append("<ul>")
|
|
for d in diag:
|
|
H.append("<li>%s</li>" % d)
|
|
H.append("</ul>")
|
|
H.append("<p>Import terminé !</p>")
|
|
H.append('<p><a class="stdlink" href="%s">Continuer</a></p>' % dest)
|
|
return "\n".join(H) + html_sco_header.sco_footer()
|
|
|
|
|
|
def scolars_import_excel_file(
|
|
datafile: io.BytesIO,
|
|
formsemestre_id=None,
|
|
check_homonyms=True,
|
|
require_ine=False,
|
|
exclude_cols=(),
|
|
):
|
|
"""Importe etudiants depuis fichier Excel
|
|
et les inscrit dans le semestre indiqué (et à TOUS ses modules)
|
|
"""
|
|
log(f"scolars_import_excel_file: {formsemestre_id}")
|
|
cnx = ndb.GetDBConnexion()
|
|
cursor = cnx.cursor(cursor_factory=ndb.ScoDocCursor)
|
|
annee_courante = time.localtime()[0]
|
|
always_require_ine = sco_preferences.get_preference("always_require_ine")
|
|
exceldata = datafile.read()
|
|
if not exceldata:
|
|
raise ScoValueError("Ficher excel vide ou invalide")
|
|
diag, data = sco_excel.excel_bytes_to_list(exceldata)
|
|
if not data: # probably a bug
|
|
raise ScoException("scolars_import_excel_file: empty file !")
|
|
|
|
formsemestre_to_invalidate = set()
|
|
# 1- --- check title line
|
|
titles = {}
|
|
fmt = sco_import_format()
|
|
for l in fmt:
|
|
tit = l[0].lower().split()[0] # titles in lowercase, and take 1st word
|
|
if (
|
|
(not formsemestre_id) or (tit != "codesemestre")
|
|
) and tit not in exclude_cols:
|
|
titles[tit] = l[1:] # title : (Type, Table, AllowNulls, Description)
|
|
|
|
# remove quotes, downcase and keep only 1st word
|
|
try:
|
|
fs = [scu.stripquotes(s).lower().split()[0] for s in data[0]]
|
|
except Exception as exc:
|
|
raise ScoValueError("Titres de colonnes invalides (ou vides ?)") from exc
|
|
|
|
# check columns titles
|
|
if len(fs) != len(titles):
|
|
missing = {}.fromkeys(list(titles.keys()))
|
|
unknown = []
|
|
for f in fs:
|
|
if f in missing:
|
|
del missing[f]
|
|
else:
|
|
unknown.append(f)
|
|
raise ScoValueError(
|
|
"""Nombre de colonnes incorrect (devrait être %d, et non %d)<br>
|
|
(colonnes manquantes: %s, colonnes invalides: %s)"""
|
|
% (len(titles), len(fs), list(missing.keys()), unknown)
|
|
)
|
|
titleslist = []
|
|
for t in fs:
|
|
if t not in titles:
|
|
raise ScoValueError('Colonne invalide: "%s"' % t)
|
|
titleslist.append(t) #
|
|
# ok, same titles
|
|
# Start inserting data, abort whole transaction in case of error
|
|
created_etudids = []
|
|
np_imported_homonyms = 0
|
|
GroupIdInferers = {}
|
|
try: # --- begin DB transaction
|
|
linenum = 0
|
|
for line in data[1:]:
|
|
linenum += 1
|
|
# Read fields, check and convert type
|
|
values = {}
|
|
fs = line
|
|
# remove quotes
|
|
for i in range(len(fs)):
|
|
if fs[i] and (
|
|
(fs[i][0] == '"' and fs[i][-1] == '"')
|
|
or (fs[i][0] == "'" and fs[i][-1] == "'")
|
|
):
|
|
fs[i] = fs[i][1:-1]
|
|
for i in range(len(fs)):
|
|
val = fs[i].strip()
|
|
typ, table, an, descr, aliases = tuple(titles[titleslist[i]])
|
|
# log('field %s: %s %s %s %s'%(titleslist[i], table, typ, an, descr))
|
|
if not val and not an:
|
|
raise ScoValueError(
|
|
"line %d: null value not allowed in column %s"
|
|
% (linenum, titleslist[i])
|
|
)
|
|
if val == "":
|
|
val = None
|
|
else:
|
|
if typ == "real":
|
|
val = val.replace(",", ".") # si virgule a la française
|
|
try:
|
|
val = float(val)
|
|
except:
|
|
raise ScoValueError(
|
|
"valeur nombre reel invalide (%s) sur line %d, colonne %s"
|
|
% (val, linenum, titleslist[i])
|
|
)
|
|
elif typ == "integer":
|
|
try:
|
|
# on doit accepter des valeurs comme "2006.0"
|
|
val = val.replace(",", ".") # si virgule a la française
|
|
val = float(val)
|
|
if val % 1.0 > 1e-4:
|
|
raise ValueError()
|
|
val = int(val)
|
|
except:
|
|
raise ScoValueError(
|
|
"valeur nombre entier invalide (%s) sur ligne %d, colonne %s"
|
|
% (val, linenum, titleslist[i])
|
|
)
|
|
# xxx Ad-hoc checks (should be in format description)
|
|
if titleslist[i].lower() == "sexe":
|
|
try:
|
|
val = input_civilite(val)
|
|
except:
|
|
raise ScoValueError(
|
|
"valeur invalide pour 'SEXE' (doit etre 'M', 'F', ou 'MME', 'H', 'X' ou vide, mais pas '%s') ligne %d, colonne %s"
|
|
% (val, linenum, titleslist[i])
|
|
)
|
|
# Excel date conversion:
|
|
if titleslist[i].lower() == "date_naissance":
|
|
if val:
|
|
try:
|
|
val = sco_excel.xldate_as_datetime(val)
|
|
except ValueError as exc:
|
|
raise ScoValueError(
|
|
f"date invalide ({val}) sur ligne {linenum}, colonne {titleslist[i]}"
|
|
) from exc
|
|
# INE
|
|
if (
|
|
titleslist[i].lower() == "code_ine"
|
|
and always_require_ine
|
|
and not val
|
|
):
|
|
raise ScoValueError(
|
|
"Code INE manquant sur ligne %d, colonne %s"
|
|
% (linenum, titleslist[i])
|
|
)
|
|
|
|
# --
|
|
values[titleslist[i]] = val
|
|
skip = False
|
|
is_new_ine = values["code_ine"] and _is_new_ine(cnx, values["code_ine"])
|
|
if require_ine and not is_new_ine:
|
|
log("skipping %s (code_ine=%s)" % (values["nom"], values["code_ine"]))
|
|
skip = True
|
|
|
|
if not skip:
|
|
if values["code_ine"] and not is_new_ine:
|
|
raise ScoValueError("Code INE dupliqué (%s)" % values["code_ine"])
|
|
# Check nom/prenom
|
|
ok = False
|
|
if "nom" in values and "prenom" in values:
|
|
ok, nb_homonyms = sco_etud.check_nom_prenom(
|
|
cnx, nom=values["nom"], prenom=values["prenom"]
|
|
)
|
|
if not ok:
|
|
raise ScoValueError(
|
|
"nom ou prénom invalide sur la ligne %d" % (linenum)
|
|
)
|
|
if nb_homonyms:
|
|
np_imported_homonyms += 1
|
|
# Insert in DB tables
|
|
formsemestre_id_etud = _import_one_student(
|
|
cnx,
|
|
formsemestre_id,
|
|
values,
|
|
GroupIdInferers,
|
|
annee_courante,
|
|
created_etudids,
|
|
linenum,
|
|
)
|
|
|
|
# Verification proportion d'homonymes: si > 10%, abandonne
|
|
log("scolars_import_excel_file: detected %d homonyms" % np_imported_homonyms)
|
|
if check_homonyms and np_imported_homonyms > len(created_etudids) / 10:
|
|
log("scolars_import_excel_file: too many homonyms")
|
|
raise ScoValueError(
|
|
"Il y a trop d'homonymes (%d étudiants)" % np_imported_homonyms
|
|
)
|
|
except:
|
|
cnx.rollback()
|
|
log("scolars_import_excel_file: aborting transaction !")
|
|
# Nota: db transaction is sometimes partly commited...
|
|
# here we try to remove all created students
|
|
cursor = cnx.cursor(cursor_factory=ndb.ScoDocCursor)
|
|
for etudid in created_etudids:
|
|
log("scolars_import_excel_file: deleting etudid=%s" % etudid)
|
|
cursor.execute(
|
|
"delete from notes_moduleimpl_inscription where etudid=%(etudid)s",
|
|
{"etudid": etudid},
|
|
)
|
|
cursor.execute(
|
|
"delete from notes_formsemestre_inscription where etudid=%(etudid)s",
|
|
{"etudid": etudid},
|
|
)
|
|
cursor.execute(
|
|
"delete from scolar_events where etudid=%(etudid)s", {"etudid": etudid}
|
|
)
|
|
cursor.execute(
|
|
"delete from adresse where etudid=%(etudid)s", {"etudid": etudid}
|
|
)
|
|
cursor.execute(
|
|
"delete from admissions where etudid=%(etudid)s", {"etudid": etudid}
|
|
)
|
|
cursor.execute(
|
|
"delete from group_membership where etudid=%(etudid)s",
|
|
{"etudid": etudid},
|
|
)
|
|
cursor.execute(
|
|
"delete from identite where id=%(etudid)s", {"etudid": etudid}
|
|
)
|
|
cnx.commit()
|
|
log("scolars_import_excel_file: re-raising exception")
|
|
raise
|
|
|
|
diag.append("Import et inscription de %s étudiants" % len(created_etudids))
|
|
|
|
ScolarNews.add(
|
|
typ=ScolarNews.NEWS_INSCR,
|
|
text="Inscription de %d étudiants" # peuvent avoir ete inscrits a des semestres differents
|
|
% len(created_etudids),
|
|
obj=formsemestre_id,
|
|
max_frequency=0,
|
|
)
|
|
|
|
log("scolars_import_excel_file: completing transaction")
|
|
cnx.commit()
|
|
|
|
# Invalide les caches des semestres dans lesquels on a inscrit des etudiants:
|
|
for formsemestre_id in formsemestre_to_invalidate:
|
|
sco_cache.invalidate_formsemestre(formsemestre_id=formsemestre_id)
|
|
|
|
return diag
|
|
|
|
|
|
def students_import_admission(
|
|
csvfile, type_admission="", formsemestre_id=None, return_html=True
|
|
):
|
|
"import donnees admission from Excel file (v2016)"
|
|
diag = scolars_import_admission(
|
|
csvfile,
|
|
formsemestre_id=formsemestre_id,
|
|
type_admission=type_admission,
|
|
)
|
|
if return_html:
|
|
H = [html_sco_header.sco_header(page_title="Import données admissions")]
|
|
H.append("<p>Import terminé !</p>")
|
|
H.append(
|
|
'<p><a class="stdlink" href="%s">Continuer</a></p>'
|
|
% url_for(
|
|
"notes.formsemestre_status",
|
|
scodoc_dept=g.scodoc_dept,
|
|
formsemestre_id=formsemestre_id,
|
|
)
|
|
)
|
|
if diag:
|
|
H.append("<p>Diagnostic: <ul><li>%s</li></ul></p>" % "</li><li>".join(diag))
|
|
|
|
return "\n".join(H) + html_sco_header.sco_footer()
|
|
|
|
|
|
def _import_one_student(
|
|
cnx,
|
|
formsemestre_id,
|
|
values,
|
|
GroupIdInferers,
|
|
annee_courante,
|
|
created_etudids,
|
|
linenum,
|
|
) -> int:
|
|
"""
|
|
Import d'un étudiant et inscription dans le semestre.
|
|
Return: id du semestre dans lequel il a été inscrit.
|
|
"""
|
|
log(
|
|
"scolars_import_excel_file: formsemestre_id=%s values=%s"
|
|
% (formsemestre_id, str(values))
|
|
)
|
|
# Identite
|
|
args = values.copy()
|
|
etudid = sco_etud.identite_create(cnx, args)
|
|
created_etudids.append(etudid)
|
|
# Admissions
|
|
args["etudid"] = etudid
|
|
args["annee"] = annee_courante
|
|
_ = sco_etud.admission_create(cnx, args)
|
|
# Adresse
|
|
args["typeadresse"] = "domicile"
|
|
args["description"] = "(infos admission)"
|
|
_ = sco_etud.adresse_create(cnx, args)
|
|
# Inscription au semestre
|
|
args["etat"] = scu.INSCRIT # etat insc. semestre
|
|
if formsemestre_id:
|
|
args["formsemestre_id"] = formsemestre_id
|
|
else:
|
|
args["formsemestre_id"] = values["codesemestre"]
|
|
formsemestre_id = values["codesemestre"]
|
|
try:
|
|
formsemestre_id = int(formsemestre_id)
|
|
except (ValueError, TypeError) as exc:
|
|
raise ScoValueError(
|
|
f"valeur invalide ou manquante dans la colonne codesemestre, ligne {linenum+1}"
|
|
) from exc
|
|
# recupere liste des groupes:
|
|
if formsemestre_id not in GroupIdInferers:
|
|
GroupIdInferers[formsemestre_id] = sco_groups.GroupIdInferer(formsemestre_id)
|
|
gi = GroupIdInferers[formsemestre_id]
|
|
if args["groupes"]:
|
|
groupes = args["groupes"].split(";")
|
|
else:
|
|
groupes = []
|
|
group_ids = [gi[group_name] for group_name in groupes]
|
|
group_ids = list({}.fromkeys(group_ids).keys()) # uniq
|
|
if None in group_ids:
|
|
raise ScoValueError(
|
|
"groupe invalide sur la ligne %d (groupe %s)" % (linenum, groupes)
|
|
)
|
|
|
|
do_formsemestre_inscription_with_modules(
|
|
int(args["formsemestre_id"]),
|
|
etudid,
|
|
group_ids,
|
|
etat=scu.INSCRIT,
|
|
method="import_csv_file",
|
|
)
|
|
return args["formsemestre_id"]
|
|
|
|
|
|
def _is_new_ine(cnx, code_ine):
|
|
"True if this code is not in DB"
|
|
etuds = sco_etud.identite_list(cnx, {"code_ine": code_ine})
|
|
return not etuds
|
|
|
|
|
|
# ------ Fonction ré-écrite en nov 2016 pour lire des fichiers sans etudid (fichiers APB)
|
|
def scolars_import_admission(datafile, formsemestre_id=None, type_admission=None):
|
|
"""Importe données admission depuis un fichier Excel quelconque
|
|
par exemple ceux utilisés avec APB
|
|
|
|
Cherche dans ce fichier les étudiants qui correspondent à des inscrits du
|
|
semestre formsemestre_id.
|
|
Le fichier n'a pas l'INE ni le NIP ni l'etudid, la correspondance se fait
|
|
via les noms/prénoms qui doivent être égaux (la casse, les accents et caractères spéciaux
|
|
étant ignorés).
|
|
|
|
On tolère plusieurs variantes pour chaque nom de colonne (ici aussi, la casse, les espaces
|
|
et les caractères spéciaux sont ignorés. Ainsi, la colonne "Prénom:" sera considéré comme "prenom".
|
|
|
|
Le parametre type_admission remplace les valeurs vides (dans la base ET dans le fichier importé) du champ type_admission.
|
|
Si une valeur existe ou est présente dans le fichier importé, ce paramètre est ignoré.
|
|
|
|
TODO:
|
|
- choix onglet du classeur
|
|
"""
|
|
|
|
log("scolars_import_admission: formsemestre_id=%s" % formsemestre_id)
|
|
members = sco_groups.get_group_members(
|
|
sco_groups.get_default_group(formsemestre_id)
|
|
)
|
|
etuds_by_nomprenom = {} # { nomprenom : etud }
|
|
diag = []
|
|
for m in members:
|
|
np = (adm_normalize_string(m["nom"]), adm_normalize_string(m["prenom"]))
|
|
if np in etuds_by_nomprenom:
|
|
msg = "Attention: hononymie pour %s %s" % (m["nom"], m["prenom"])
|
|
log(msg)
|
|
diag.append(msg)
|
|
etuds_by_nomprenom[np] = m
|
|
|
|
exceldata = datafile.read()
|
|
diag2, data = sco_excel.excel_bytes_to_list(exceldata)
|
|
if not data:
|
|
raise ScoException("scolars_import_admission: empty file !")
|
|
diag += diag2
|
|
cnx = ndb.GetDBConnexion()
|
|
|
|
titles = data[0]
|
|
# idx -> ('field', convertor)
|
|
fields = adm_get_fields(titles, formsemestre_id)
|
|
idx_nom = None
|
|
idx_prenom = None
|
|
for idx, field in fields.items():
|
|
if field[0] == "nom":
|
|
idx_nom = idx
|
|
if field[0] == "prenom":
|
|
idx_prenom = idx
|
|
if (idx_nom is None) or (idx_prenom is None):
|
|
log("fields indices=" + ", ".join([str(x) for x in fields]))
|
|
log("fields titles =" + ", ".join([fields[x][0] for x in fields]))
|
|
raise ScoFormatError(
|
|
"scolars_import_admission: colonnes nom et prenom requises",
|
|
dest_url=url_for(
|
|
"scolar.form_students_import_infos_admissions",
|
|
scodoc_dept=g.scodoc_dept,
|
|
formsemestre_id=formsemestre_id,
|
|
),
|
|
)
|
|
|
|
modifiable_fields = set(ADMISSION_MODIFIABLE_FIELDS)
|
|
|
|
nline = 2 # la premiere ligne de donnees du fichier excel est 2
|
|
n_import = 0
|
|
for line in data[1:]:
|
|
# Retrouve l'étudiant parmi ceux du semestre par (nom, prenom)
|
|
nom = adm_normalize_string(line[idx_nom])
|
|
prenom = adm_normalize_string(line[idx_prenom])
|
|
if (nom, prenom) not in etuds_by_nomprenom:
|
|
msg = f"""Étudiant <b>{line[idx_nom]} {line[idx_prenom]} inexistant</b>"""
|
|
diag.append(msg)
|
|
else:
|
|
etud = etuds_by_nomprenom[(nom, prenom)]
|
|
cur_adm = sco_etud.admission_list(cnx, args={"etudid": etud["etudid"]})[0]
|
|
# peuple les champs presents dans le tableau
|
|
args = {}
|
|
for idx, field in fields.items():
|
|
field_name, convertor = field
|
|
if field_name in modifiable_fields:
|
|
try:
|
|
val = convertor(line[idx])
|
|
except ValueError as exc:
|
|
raise ScoFormatError(
|
|
'scolars_import_admission: valeur invalide, ligne %d colonne %s: "%s"'
|
|
% (nline, field_name, line[idx]),
|
|
dest_url=url_for(
|
|
"scolar.form_students_import_infos_admissions",
|
|
scodoc_dept=g.scodoc_dept,
|
|
formsemestre_id=formsemestre_id,
|
|
),
|
|
) from exc
|
|
if val is not None: # note: ne peut jamais supprimer une valeur
|
|
args[field_name] = val
|
|
if args:
|
|
args["etudid"] = etud["etudid"]
|
|
args["adm_id"] = cur_adm["adm_id"]
|
|
# Type admission: traitement particulier
|
|
if not cur_adm["type_admission"] and not args.get("type_admission"):
|
|
args["type_admission"] = type_admission
|
|
sco_etud.etudident_edit(cnx, args, disable_notify=True)
|
|
adr = sco_etud.adresse_list(cnx, args={"etudid": etud["etudid"]})
|
|
if adr:
|
|
args["adresse_id"] = adr[0]["adresse_id"]
|
|
sco_etud.adresse_edit(
|
|
cnx, args, disable_notify=True
|
|
) # pas de notification ici
|
|
else:
|
|
args["typeadresse"] = "domicile"
|
|
args["description"] = "(infos admission)"
|
|
adresse_id = sco_etud.adresse_create(cnx, args)
|
|
# log('import_adm: %s' % args )
|
|
# Change les groupes si nécessaire:
|
|
if "groupes" in args:
|
|
gi = sco_groups.GroupIdInferer(formsemestre_id)
|
|
groupes = args["groupes"].split(";")
|
|
group_ids = [gi[group_name] for group_name in groupes]
|
|
group_ids = list({}.fromkeys(group_ids).keys()) # uniq
|
|
if None in group_ids:
|
|
raise ScoValueError(
|
|
f"groupe invalide sur la ligne {nline} (groupes {groupes})"
|
|
)
|
|
|
|
for group_id in group_ids:
|
|
group = db.session.get(GroupDescr, group_id)
|
|
if group.partition.groups_editable:
|
|
sco_groups.change_etud_group_in_partition(
|
|
args["etudid"], group
|
|
)
|
|
else:
|
|
log("scolars_import_admission: partition non editable")
|
|
diag.append(
|
|
f"Attention: partition {group.partition} non editable (ignorée)"
|
|
)
|
|
|
|
#
|
|
diag.append("import de %s" % (etud["nomprenom"]))
|
|
n_import += 1
|
|
nline += 1
|
|
diag.append("%d lignes importées" % n_import)
|
|
if n_import > 0:
|
|
sco_cache.invalidate_formsemestre(formsemestre_id=formsemestre_id)
|
|
return diag
|
|
|
|
|
|
_ADM_PATTERN = re.compile(r"[\W]+", re.UNICODE) # supprime tout sauf alphanum
|
|
|
|
|
|
def adm_normalize_string(s):
|
|
"normalize unicode title"
|
|
return scu.suppress_accents(_ADM_PATTERN.sub("", s.strip().lower())).replace(
|
|
"_", ""
|
|
)
|
|
|
|
|
|
def adm_get_fields(titles, formsemestre_id):
|
|
"""Cherche les colonnes importables dans les titres (ligne 1) du fichier excel
|
|
return: { idx : (field_name, convertor) }
|
|
"""
|
|
format_dict = sco_import_format_dict()
|
|
fields = {}
|
|
idx = 0
|
|
for title in titles:
|
|
title_n = adm_normalize_string(title)
|
|
for k in format_dict:
|
|
for v in format_dict[k]["aliases"]:
|
|
if adm_normalize_string(v) == title_n:
|
|
typ = format_dict[k]["type"]
|
|
if typ == "real":
|
|
convertor = adm_convert_real
|
|
elif typ == "integer" or typ == "int":
|
|
convertor = adm_convert_int
|
|
else:
|
|
convertor = adm_convert_text
|
|
# doublons ?
|
|
if k in [x[0] for x in fields.values()]:
|
|
raise ScoFormatError(
|
|
f"""scolars_import_admission: titre "{title}" en double (ligne 1)""",
|
|
dest_url=url_for(
|
|
"scolar.form_students_import_infos_admissions",
|
|
scodoc_dept=g.scodoc_dept,
|
|
formsemestre_id=formsemestre_id,
|
|
),
|
|
)
|
|
fields[idx] = (k, convertor)
|
|
idx += 1
|
|
|
|
return fields
|
|
|
|
|
|
def adm_convert_text(v):
|
|
if isinstance(v, float):
|
|
return "{:g}".format(v) # evite "1.0"
|
|
return v
|
|
|
|
|
|
def adm_convert_int(v):
|
|
if type(v) != int and not v:
|
|
return None
|
|
return int(float(v)) # accept "10.0"
|
|
|
|
|
|
def adm_convert_real(v):
|
|
if type(v) != float and not v:
|
|
return None
|
|
return float(v)
|
|
|
|
|
|
def adm_table_description_format():
|
|
"""Table HTML (ou autre format) decrivant les donnees d'admissions importables"""
|
|
Fmt = sco_import_format_dict(with_codesemestre=False)
|
|
for k in Fmt:
|
|
Fmt[k]["attribute"] = k
|
|
Fmt[k]["aliases_str"] = ", ".join(Fmt[k]["aliases"])
|
|
if not Fmt[k]["allow_nulls"]:
|
|
Fmt[k]["required"] = "*"
|
|
if k in ADMISSION_MODIFIABLE_FIELDS:
|
|
Fmt[k]["writable"] = "oui"
|
|
else:
|
|
Fmt[k]["writable"] = "non"
|
|
titles = {
|
|
"attribute": "Attribut",
|
|
"type": "Type",
|
|
"required": "Requis",
|
|
"writable": "Modifiable",
|
|
"description": "Description",
|
|
"aliases_str": "Titres (variantes)",
|
|
}
|
|
columns_ids = ("attribute", "type", "writable", "description", "aliases_str")
|
|
|
|
tab = GenTable(
|
|
titles=titles,
|
|
columns_ids=columns_ids,
|
|
rows=list(Fmt.values()),
|
|
html_sortable=True,
|
|
html_class="table_leftalign",
|
|
preferences=sco_preferences.SemPreferences(),
|
|
)
|
|
return tab
|