TestsScoDoc7API/ImportScolars.py

780 lines
28 KiB
Python
Raw Normal View History

2020-09-26 16:19:37 +02:00
# -*- mode: python -*-
# -*- coding: utf-8 -*-
##############################################################################
#
# Gestion scolarite IUT
#
# Copyright (c) 1999 - 2020 Emmanuel Viennet. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Emmanuel Viennet emmanuel.viennet@viennet.net
#
##############################################################################
""" Importation des etudiants à partir de fichiers CSV
"""
import os, sys, time, pdb
from sco_utils import *
from notesdb import *
from notes_log import log
import scolars
import sco_formsemestre
import sco_groups
import sco_excel
import sco_groups_view
import sco_news
from sco_news import NEWS_INSCR, NEWS_NOTE, NEWS_FORM, NEWS_SEM, NEWS_MISC
from sco_formsemestre_inscriptions import do_formsemestre_inscription_with_modules
from gen_tables import GenTable
# format description (relative to Product directory))
FORMAT_FILE = "misc/format_import_etudiants.txt"
# Champs modifiables via "Import données admission"
ADMISSION_MODIFIABLE_FIELDS = (
"code_nip",
"code_ine",
"date_naissance",
"lieu_naissance",
"bac",
"specialite",
"annee_bac",
"math",
"physique",
"anglais",
"francais",
"type_admission",
"boursier_prec",
"qualite",
"rapporteur",
"score",
"commentaire",
"classement",
"apb_groupe",
"apb_classement_gr",
"nomlycee",
"villelycee",
"codepostallycee",
"codelycee",
# Adresse:
"email",
"emailperso",
"domicile",
"codepostaldomicile",
"villedomicile",
"paysdomicile",
"telephone",
"telephonemobile",
# Debouche
"debouche",
# Groupes
"groupes",
)
# ----
def sco_import_format(with_codesemestre=True):
"returns tuples (Attribut, Type, Table, AllowNulls, Description)"
r = []
for l in open(SCO_SRCDIR + "/" + FORMAT_FILE):
l = l.strip()
if l and l[0] != "#":
fs = l.split(";")
if len(fs) < 5:
# Bug: invalid format file (fatal)
raise ScoException(
"file %s has invalid format (expected %d fields, got %d) (%s)"
% (FORMAT_FILE, 5, len(fs), l)
)
fieldname = (
fs[0].strip().lower().split()[0]
) # titre attribut: normalize, 1er mot seulement (nom du champ en BD)
typ, table, allow_nulls, description = [x.strip() for x in fs[1:5]]
aliases = [x.strip() for x in fs[5:] if x.strip()]
if fieldname not in aliases:
aliases.insert(0, fieldname) # prepend
if with_codesemestre or fs[0] != "codesemestre":
r.append((fieldname, typ, table, allow_nulls, description, aliases))
return r
def sco_import_format_dict(with_codesemestre=True):
"""Attribut: { 'type': , 'table', 'allow_nulls' , 'description' }"""
2020-09-26 16:19:37 +02:00
fmt = sco_import_format(with_codesemestre=with_codesemestre)
R = collections.OrderedDict()
for l in fmt:
R[l[0]] = {
"type": l[1],
"table": l[2],
"allow_nulls": l[3],
"description": l[4],
"aliases": l[5],
}
return R
def sco_import_generate_excel_sample(
fmt,
with_codesemestre=True,
only_tables=None,
with_groups=True,
exclude_cols=[],
extra_cols=[],
group_ids=[],
context=None,
REQUEST=None,
):
"""Generates an excel document based on format fmt
(format is the result of sco_import_format())
If not None, only_tables can specify a list of sql table names
(only columns from these tables will be generated)
If group_ids, liste les etudiants de ces groupes
"""
style = sco_excel.Excel_MakeStyle(bold=True)
style_required = sco_excel.Excel_MakeStyle(bold=True, color="red")
titles = []
titlesStyles = []
for l in fmt:
name = strlower(l[0])
if (not with_codesemestre) and name == "codesemestre":
continue # pas de colonne codesemestre
if only_tables is not None and strlower(l[2]) not in only_tables:
continue # table non demandée
if name in exclude_cols:
continue # colonne exclue
if int(l[3]):
titlesStyles.append(style)
else:
titlesStyles.append(style_required)
titles.append(name)
if with_groups and "groupes" not in titles:
titles.append("groupes")
titlesStyles.append(style)
titles += extra_cols
titlesStyles += [style] * len(extra_cols)
if group_ids and context:
groups_infos = sco_groups_view.DisplayedGroupsInfos(
context, group_ids, REQUEST=REQUEST
)
members = groups_infos.members
log(
"sco_import_generate_excel_sample: group_ids=%s %d members"
% (group_ids, len(members))
)
titles = ["etudid"] + titles
titlesStyles = [style] + titlesStyles
# rempli table avec données actuelles
lines = []
for i in members:
etud = context.getEtudInfo(etudid=i["etudid"], filled=True)[0]
l = []
for field in titles:
if field == "groupes":
sco_groups.etud_add_group_infos(
context, etud, groups_infos.formsemestre, sep=";"
)
l.append(etud["partitionsgroupes"])
else:
key = strlower(field).split()[0]
l.append(etud.get(key, ""))
lines.append(l)
else:
lines = [[]] # empty content, titles only
return sco_excel.Excel_SimpleTable(
titles=titles, titlesStyles=titlesStyles, SheetName="Etudiants", lines=lines
)
def students_import_excel(
context,
csvfile,
REQUEST=None,
formsemestre_id=None,
check_homonyms=True,
require_ine=False,
):
"import students from Excel file"
diag = scolars_import_excel_file(
csvfile,
context.Notes,
REQUEST,
formsemestre_id=formsemestre_id,
check_homonyms=check_homonyms,
require_ine=require_ine,
exclude_cols=["photo_filename"],
)
if REQUEST:
if formsemestre_id:
dest = "formsemestre_status?formsemestre_id=%s" % formsemestre_id
else:
dest = REQUEST.URL1
H = [context.sco_header(REQUEST, page_title="Import etudiants")]
H.append("<ul>")
for d in diag:
H.append("<li>%s</li>" % d)
H.append("</ul>")
H.append("<p>Import terminé !</p>")
H.append('<p><a class="stdlink" href="%s">Continuer</a></p>' % dest)
return "\n".join(H) + context.sco_footer(REQUEST)
def scolars_import_excel_file(
datafile,
context,
REQUEST,
formsemestre_id=None,
check_homonyms=True,
require_ine=False,
exclude_cols=[],
):
"""Importe etudiants depuis fichier Excel
et les inscrit dans le semestre indiqué (et à TOUS ses modules)
"""
log("scolars_import_excel_file: formsemestre_id=%s" % formsemestre_id)
cnx = context.GetDBConnexion(autocommit=False)
cursor = cnx.cursor(cursor_factory=ScoDocCursor)
annee_courante = time.localtime()[0]
always_require_ine = context.get_preference("always_require_ine")
exceldata = datafile.read()
if not exceldata:
raise ScoValueError("Ficher excel vide ou invalide")
diag, data = sco_excel.Excel_to_list(exceldata)
if not data: # probably a bug
raise ScoException("scolars_import_excel_file: empty file !")
formsemestre_to_invalidate = set()
2020-09-26 16:19:37 +02:00
# 1- --- check title line
titles = {}
fmt = sco_import_format()
for l in fmt:
tit = strlower(l[0]).split()[0] # titles in lowercase, and take 1st word
if (
(not formsemestre_id) or (tit != "codesemestre")
) and tit not in exclude_cols:
titles[tit] = l[1:] # title : (Type, Table, AllowNulls, Description)
# log("titles=%s" % titles)
# remove quotes, downcase and keep only 1st word
try:
fs = [strlower(stripquotes(s)).split()[0] for s in data[0]]
except:
raise ScoValueError("Titres de colonnes invalides (ou vides ?)")
# log("excel: fs='%s'\ndata=%s" % (str(fs), str(data)))
# check columns titles
if len(fs) != len(titles):
missing = {}.fromkeys(titles.keys())
unknown = []
for f in fs:
if missing.has_key(f):
del missing[f]
else:
unknown.append(f)
raise ScoValueError(
"Nombre de colonnes incorrect (devrait être %d, et non %d) <br/> (colonnes manquantes: %s, colonnes invalides: %s)"
% (len(titles), len(fs), missing.keys(), unknown)
)
titleslist = []
for t in fs:
if not titles.has_key(t):
raise ScoValueError('Colonne invalide: "%s"' % t)
titleslist.append(t) #
# ok, same titles
# Start inserting data, abort whole transaction in case of error
created_etudids = []
NbImportedHomonyms = 0
GroupIdInferers = {}
try: # --- begin DB transaction
linenum = 0
for line in data[1:]:
linenum += 1
# Read fields, check and convert type
values = {}
fs = line
# remove quotes
for i in range(len(fs)):
if fs[i] and (
(fs[i][0] == '"' and fs[i][-1] == '"')
or (fs[i][0] == "'" and fs[i][-1] == "'")
):
fs[i] = fs[i][1:-1]
for i in range(len(fs)):
val = fs[i].strip()
typ, table, an, descr, aliases = tuple(titles[titleslist[i]])
# log('field %s: %s %s %s %s'%(titleslist[i], table, typ, an, descr))
if not val and not an:
raise ScoValueError(
"line %d: null value not allowed in column %s"
% (linenum, titleslist[i])
)
if val == "":
val = None
else:
if typ == "real":
val = val.replace(",", ".") # si virgule a la française
try:
val = float(val)
except:
raise ScoValueError(
"valeur nombre reel invalide (%s) sur line %d, colonne %s"
% (val, linenum, titleslist[i])
)
elif typ == "integer":
try:
# on doit accepter des valeurs comme "2006.0"
val = val.replace(",", ".") # si virgule a la française
val = float(val)
if val % 1.0 > 1e-4:
raise ValueError()
val = int(val)
except:
raise ScoValueError(
"valeur nombre entier invalide (%s) sur ligne %d, colonne %s"
% (val, linenum, titleslist[i])
)
# xxx Ad-hoc checks (should be in format description)
if strlower(titleslist[i]) == "sexe":
try:
val = scolars.normalize_sexe(val)
except:
raise ScoValueError(
"valeur invalide pour 'SEXE' (doit etre 'M' ou 'MME' ou 'H' ou 'F', pas '%s') ligne %d, colonne %s"
% (val, linenum, titleslist[i])
)
# Excel date conversion:
if strlower(titleslist[i]) == "date_naissance":
if val:
if re.match("^[0-9]*\.?[0-9]*$", str(val)):
val = sco_excel.xldate_as_datetime(float(val))
# INE
if (
strlower(titleslist[i]) == "code_ine"
and always_require_ine
and not val
):
raise ScoValueError(
"Code INE manquant sur ligne %d, colonne %s"
% (linenum, titleslist[i])
)
# --
values[titleslist[i]] = val
skip = False
is_new_ine = values["code_ine"] and _is_new_ine(cnx, values["code_ine"])
if require_ine and not is_new_ine:
log("skipping %s (code_ine=%s)" % (values["nom"], values["code_ine"]))
skip = True
if not skip:
if values["code_ine"] and not is_new_ine:
raise ScoValueError("Code INE dupliqué (%s)" % values["code_ine"])
# Check nom/prenom
ok, NbHomonyms = scolars.check_nom_prenom(
cnx, nom=values["nom"], prenom=values["prenom"]
)
if not ok:
raise ScoValueError(
"nom ou prénom invalide sur la ligne %d" % (linenum)
)
if NbHomonyms:
NbImportedHomonyms += 1
# Insert in DB tables
formsemestre_to_invalidate.add(
_import_one_student(
context,
cnx,
REQUEST,
formsemestre_id,
values,
GroupIdInferers,
annee_courante,
created_etudids,
linenum,
)
)
# Verification proportion d'homonymes: si > 10%, abandonne
log("scolars_import_excel_file: detected %d homonyms" % NbImportedHomonyms)
if check_homonyms and NbImportedHomonyms > len(created_etudids) / 10:
log("scolars_import_excel_file: too many homonyms")
raise ScoValueError(
"Il y a trop d'homonymes (%d étudiants)" % NbImportedHomonyms
)
except:
cnx.rollback()
log("scolars_import_excel_file: aborting transaction !")
# Nota: db transaction is sometimes partly commited...
# here we try to remove all created students
cursor = cnx.cursor(cursor_factory=ScoDocCursor)
for etudid in created_etudids:
log("scolars_import_excel_file: deleting etudid=%s" % etudid)
cursor.execute(
"delete from notes_moduleimpl_inscription where etudid=%(etudid)s",
{"etudid": etudid},
)
cursor.execute(
"delete from notes_formsemestre_inscription where etudid=%(etudid)s",
{"etudid": etudid},
)
cursor.execute(
"delete from scolar_events where etudid=%(etudid)s", {"etudid": etudid}
)
cursor.execute(
"delete from adresse where etudid=%(etudid)s", {"etudid": etudid}
)
cursor.execute(
"delete from admissions where etudid=%(etudid)s", {"etudid": etudid}
)
cursor.execute(
"delete from group_membership where etudid=%(etudid)s",
{"etudid": etudid},
)
cursor.execute(
"delete from identite where etudid=%(etudid)s", {"etudid": etudid}
)
cnx.commit()
log("scolars_import_excel_file: re-raising exception")
raise
diag.append("Import et inscription de %s étudiants" % len(created_etudids))
sco_news.add(
context,
REQUEST,
typ=NEWS_INSCR,
text="Inscription de %d étudiants" # peuvent avoir ete inscrits a des semestres differents
% len(created_etudids),
object=formsemestre_id,
)
log("scolars_import_excel_file: completing transaction")
cnx.commit()
# Invalide les caches des semestres dans lesquels on a inscrit des etudiants:
context.Notes._inval_cache(formsemestre_id_list=formsemestre_to_invalidate)
return diag
def _import_one_student(
context,
cnx,
REQUEST,
formsemestre_id,
values,
GroupIdInferers,
annee_courante,
created_etudids,
linenum,
):
"""
Import d'un étudiant et inscription dans le semestre.
Return: id du semestre dans lequel il a été inscrit.
"""
log(
"scolars_import_excel_file: formsemestre_id=%s values=%s"
% (formsemestre_id, str(values))
)
# Identite
args = values.copy()
etudid = scolars.identite_create(cnx, args, context=context, REQUEST=REQUEST)
created_etudids.append(etudid)
# Admissions
args["etudid"] = etudid
args["annee"] = annee_courante
adm_id = scolars.admission_create(cnx, args)
# Adresse
args["typeadresse"] = "domicile"
args["description"] = "(infos admission)"
adresse_id = scolars.adresse_create(cnx, args)
# Inscription au semestre
args["etat"] = "I" # etat insc. semestre
if formsemestre_id:
args["formsemestre_id"] = formsemestre_id
else:
args["formsemestre_id"] = values["codesemestre"]
formsemestre_id = values["codesemestre"]
# recupere liste des groupes:
if formsemestre_id not in GroupIdInferers:
GroupIdInferers[formsemestre_id] = sco_groups.GroupIdInferer(
context, formsemestre_id
)
gi = GroupIdInferers[formsemestre_id]
if args["groupes"]:
groupes = args["groupes"].split(";")
else:
groupes = []
group_ids = [gi[group_name] for group_name in groupes]
group_ids = {}.fromkeys(group_ids).keys() # uniq
if None in group_ids:
raise ScoValueError(
"groupe invalide sur la ligne %d (groupe %s)" % (linenum, groupes)
)
do_formsemestre_inscription_with_modules(
context,
args["formsemestre_id"],
etudid,
group_ids,
etat="I",
REQUEST=REQUEST,
method="import_csv_file",
)
return args["formsemestre_id"]
def _is_new_ine(cnx, code_ine):
"True if this code is not in DB"
etuds = scolars.identite_list(cnx, {"code_ine": code_ine})
return not etuds
# ------ Fonction ré-écrite en nov 2016 pour lire des fichiers sans etudid (fichiers APB)
def scolars_import_admission(
datafile, context, REQUEST, formsemestre_id=None, type_admission=None
):
"""Importe données admission depuis un fichier Excel quelconque
par exemple ceux utilisés avec APB
Cherche dans ce fichier les étudiants qui correspondent à des inscrits du
2020-09-26 16:19:37 +02:00
semestre formsemestre_id.
Le fichier n'a pas l'INE ni le NIP ni l'etudid, la correspondance se fait
via les noms/prénoms qui doivent être égaux (la casse, les accents et caractères spéciaux
2020-09-26 16:19:37 +02:00
étant ignorés).
On tolère plusieurs variantes pour chaque nom de colonne (ici aussi, la casse, les espaces
2020-09-26 16:19:37 +02:00
et les caractères spéciaux sont ignorés. Ainsi, la colonne "Prénom:" sera considéré comme "prenom".
Le parametre type_admission remplace les valeurs vides (dans la base ET dans le fichier importé) du champ type_admission.
Si une valeur existe ou est présente dans le fichier importé, ce paramètre est ignoré.
2020-09-26 16:19:37 +02:00
TODO:
- choix onglet du classeur
"""
log("scolars_import_admission: formsemestre_id=%s" % formsemestre_id)
members = sco_groups.get_group_members(
context, sco_groups.get_default_group(context, formsemestre_id)
)
etuds_by_nomprenom = {} # { nomprenom : etud }
diag = []
for m in members:
np = (adm_normalize_string(m["nom"]), adm_normalize_string(m["prenom"]))
if np in etuds_by_nomprenom:
msg = "Attention: hononymie pour %s %s" % (m["nom"], m["prenom"])
log(msg)
diag.append(msg)
etuds_by_nomprenom[np] = m
exceldata = datafile.read()
diag2, data = sco_excel.Excel_to_list(exceldata, convert_to_string=False)
if not data:
raise ScoException("scolars_import_admission: empty file !")
diag += diag2
cnx = context.GetDBConnexion()
titles = data[0]
# idx -> ('field', convertor)
fields = adm_get_fields(titles, formsemestre_id)
idx_nom = None
idx_prenom = None
for idx in fields:
if fields[idx][0] == "nom":
idx_nom = idx
if fields[idx][0] == "prenom":
idx_prenom = idx
if (idx_nom is None) or (idx_prenom is None):
log("fields indices=" + ", ".join([str(x) for x in fields]))
log("fields titles =" + ", ".join([fields[x][0] for x in fields]))
raise FormatError(
"scolars_import_admission: colonnes nom et prenom requises",
dest_url="form_students_import_infos_admissions?formsemestre_id=%s"
% formsemestre_id,
)
modifiable_fields = set(ADMISSION_MODIFIABLE_FIELDS)
2020-09-26 16:19:37 +02:00
nline = 2 # la premiere ligne de donnees du fichier excel est 2
n_import = 0
for line in data[1:]:
# Retrouve l'étudiant parmi ceux du semestre par (nom, prenom)
nom = adm_normalize_string(line[idx_nom])
prenom = adm_normalize_string(line[idx_prenom])
if not (nom, prenom) in etuds_by_nomprenom:
log(
"unable to find %s %s among members" % (line[idx_nom], line[idx_prenom])
)
else:
etud = etuds_by_nomprenom[(nom, prenom)]
cur_adm = scolars.admission_list(cnx, args={"etudid": etud["etudid"]})[0]
# peuple les champs presents dans le tableau
args = {}
for idx in fields:
field_name, convertor = fields[idx]
if field_name in modifiable_fields:
try:
val = convertor(line[idx])
except ValueError:
raise FormatError(
'scolars_import_admission: valeur invalide, ligne %d colonne %s: "%s"'
% (nline, field_name, line[idx]),
dest_url="form_students_import_infos_admissions?formsemestre_id=%s"
% formsemestre_id,
)
if val is not None: # note: ne peut jamais supprimer une valeur
args[field_name] = val
if args:
args["etudid"] = etud["etudid"]
args["adm_id"] = cur_adm["adm_id"]
# Type admission: traitement particulier
if not cur_adm["type_admission"] and not args.get("type_admission"):
args["type_admission"] = type_admission
scolars.etudident_edit(cnx, args)
adr = scolars.adresse_list(cnx, args={"etudid": etud["etudid"]})
if adr:
args["adresse_id"] = adr[0]["adresse_id"]
scolars.adresse_edit(
cnx, args
) # ne passe pas le contexte: pas de notification ici
else:
args["typeadresse"] = "domicile"
args["description"] = "(infos admission)"
adresse_id = scolars.adresse_create(cnx, args)
# log('import_adm: %s' % args )
# Change les groupes si nécessaire:
if args["groupes"]:
gi = sco_groups.GroupIdInferer(context, formsemestre_id)
groupes = args["groupes"].split(";")
group_ids = [gi[group_name] for group_name in groupes]
group_ids = {}.fromkeys(group_ids).keys() # uniq
if None in group_ids:
raise ScoValueError(
"groupe invalide sur la ligne %d (groupe %s)"
% (nline, groupes)
)
for group_id in group_ids:
sco_groups.change_etud_group_in_partition(
context, args["etudid"], group_id, REQUEST=REQUEST
)
#
diag.append("import de %s" % (etud["nomprenom"]))
n_import += 1
nline += 1
diag.append("%d lignes importées" % n_import)
if n_import > 0:
context._inval_cache(formsemestre_id=formsemestre_id)
return diag
_ADM_PATTERN = re.compile(r"[\W]+", re.UNICODE) # supprime tout sauf alphanum
def adm_normalize_string(s): # normalize unicode title
return suppression_diacritics(_ADM_PATTERN.sub("", s.strip().lower())).replace(
"_", ""
)
def adm_get_fields(titles, formsemestre_id):
"""Cherche les colonnes importables dans les titres (ligne 1) du fichier excel
return: { idx : (field_name, convertor) }
"""
# log('adm_get_fields: titles=%s' % titles)
Fmt = sco_import_format_dict()
fields = {}
idx = 0
for title in titles:
title_n = adm_normalize_string(title)
for k in Fmt:
for v in Fmt[k]["aliases"]:
if adm_normalize_string(v) == title_n:
typ = Fmt[k]["type"]
if typ == "real":
convertor = adm_convert_real
elif typ == "integer" or typ == "int":
convertor = adm_convert_int
else:
convertor = adm_convert_text
# doublons ?
if k in [x[0] for x in fields.values()]:
raise FormatError(
'scolars_import_admission: titre "%s" en double (ligne 1)'
% (title),
dest_url="form_students_import_infos_admissions_apb?formsemestre_id=%s"
% formsemestre_id,
)
fields[idx] = (k, convertor)
idx += 1
return fields
def adm_convert_text(v):
if type(v) == FloatType:
return "{:g}".format(v) # evite "1.0"
return v
def adm_convert_int(v):
if type(v) != IntType and not v:
return None
return int(float(v)) # accept "10.0"
def adm_convert_real(v):
if type(v) != FloatType and not v:
return None
return float(v)
def adm_table_description_format(context):
"""Table HTML (ou autre format) decrivant les donnees d'admissions importables"""
2020-09-26 16:19:37 +02:00
Fmt = sco_import_format_dict(with_codesemestre=False)
for k in Fmt:
Fmt[k]["attribute"] = k
Fmt[k]["aliases_str"] = ", ".join(Fmt[k]["aliases"])
if not Fmt[k]["allow_nulls"]:
Fmt[k]["required"] = "*"
if k in ADMISSION_MODIFIABLE_FIELDS:
Fmt[k]["writable"] = "oui"
else:
Fmt[k]["writable"] = "non"
titles = {
"attribute": "Attribut",
"type": "Type",
"required": "Requis",
"writable": "Modifiable",
"description": "Description",
"aliases_str": "Titres (variantes)",
}
columns_ids = ("attribute", "type", "writable", "description", "aliases_str")
tab = GenTable(
titles=titles,
columns_ids=columns_ids,
rows=Fmt.values(),
html_sortable=True,
html_class="table_leftalign",
preferences=context.get_preferences(),
)
return tab