fix twice imported user bug & misc. enhancements

This commit is contained in:
Jean-Marie Place 2021-08-23 07:37:14 +02:00
parent f385f46897
commit db75f14118

View File

@ -85,7 +85,16 @@ def generate_excel_sample():
def import_excel_file(datafile): def import_excel_file(datafile):
"Create users from Excel file" """
Import scodoc users from Excel file.
This method:
* checks that the current_user has the ability to do so (at the moment only a SuperAdmin). He may thereoff import users with any well formed role into any deprtment (or all)
* Once the check is done ans successfull, build the list of users (does not check the data)
* call :func:`import_users` to actually do the job
history: scodoc7 with no SuperAdmin every Admin_XXX could import users.
:param datafile: the stream from to the to be imported
:return: same as import users
"""
# Check current user privilege # Check current user privilege
auth_dept = current_user.dept auth_dept = current_user.dept
auth_name = str(current_user) auth_name = str(current_user)
@ -93,7 +102,7 @@ def import_excel_file(datafile):
raise AccessDenied("invalid user (%s) must be SuperAdmin" % auth_name) raise AccessDenied("invalid user (%s) must be SuperAdmin" % auth_name)
# Récupération des informations sur l'utilisateur courant # Récupération des informations sur l'utilisateur courant
log("sco_import_users.import_excel_file by %s" % auth_name) log("sco_import_users.import_excel_file by %s" % auth_name)
# Read the data from the stream
exceldata = datafile.read() exceldata = datafile.read()
if not exceldata: if not exceldata:
raise ScoValueError("Ficher excel vide ou invalide") raise ScoValueError("Ficher excel vide ou invalide")
@ -116,42 +125,55 @@ def import_excel_file(datafile):
"colonnes incorrectes (on attend %d, et non %d) <br/> (colonnes manquantes: %s, colonnes invalides: %s)" "colonnes incorrectes (on attend %d, et non %d) <br/> (colonnes manquantes: %s, colonnes invalides: %s)"
% (len(TITLES), len(fs), list(cols.keys()), unknown) % (len(TITLES), len(fs), list(cols.keys()), unknown)
) )
# ok, same titles... # ok, same titles... : build the list of dictionaries
U = [] users = []
for line in data[1:]: for line in data[1:]:
d = {} d = {}
for i in range(len(fs)): for i in range(len(fs)):
d[fs[i]] = line[i] d[fs[i]] = line[i]
U.append(d) users.append(d)
return import_users(U, auth_dept=auth_dept) return import_users(users)
def import_users(users, auth_dept=""): def import_users(users):
"""Import des utilisateurs:
Pour chaque utilisateur à créer:
- vérifier données
- générer mot de passe aléatoire
- créer utilisateur et mettre le mot de passe
- envoyer mot de passe par mail
En cas d'erreur: supprimer tous les utilisateurs que l'on vient de créer.
""" """
Import users from a list of users_descriptors.
descriptors are dictionaries hosting users's data.
The operation is atomic (all the users are imported or none)
:param users: list of descriptors to be imported
:return: a tuple that describe the result of the import:
* ok: import ok or aborted
* messages: the list of messages
* the # of users created
"""
""" Implémentation:
Pour chaque utilisateur à créer:
* vérifier données (y compris que le même nom d'utilisateur n'est pas utilisé plusieurs fois)
* générer mot de passe aléatoire
* créer utilisateur et mettre le mot de passe
* envoyer mot de passe par mail
Les utilisateurs à créer sont stockés dans un dictionnaire.
L'ajout effectif ne se fait qu'en fin de fonction si aucune erreur n'a été détectée
"""
if len(users) == 0:
import_ok = False
msg_list = ["Feuille vide ou illisible"]
else:
created = {} # liste de uid créés
msg_list = []
line = 1 # start from excel line #2
import_ok = True
def append_msg(msg): def append_msg(msg):
msg_list.append("Ligne %s : %s" % (line, msg)) msg_list.append("Ligne %s : %s" % (line, msg))
if len(users) == 0:
import_ok = False
msg_list = ["Feuilles vide ou illisible"]
else:
created = [] # liste de uid créés
msg_list = []
line = 1 # satr from excel line #2
import_ok = True
try: try:
for u in users: for u in users:
user_ok = True
line = line + 1 line = line + 1
user_ok, msg = sco_users.check_modif_user( user_ok, msg = sco_users.check_modif_user(
0, 0,
@ -175,7 +197,7 @@ def import_users(users, auth_dept=""):
"identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)" "identifiant '%s' invalide (pas d'accents ni de caractères spéciaux)"
% u["user_name"] % u["user_name"]
) )
elif len(u["user_name"]) > 64: if len(u["user_name"]) > 64:
user_ok = False user_ok = False
append_msg( append_msg(
"identifiant '%s' trop long (64 caractères)" % u["user_name"] "identifiant '%s' trop long (64 caractères)" % u["user_name"]
@ -189,29 +211,41 @@ def import_users(users, auth_dept=""):
if len(u["email"]) > 120: if len(u["email"]) > 120:
user_ok = False user_ok = False
append_msg("email '%s' trop long (120 caractères)" % u["email"]) append_msg("email '%s' trop long (120 caractères)" % u["email"])
# check that tha same user_name has not already been described in this import
if u["user_name"] in created.keys():
user_ok = False
append_msg(
"l'utilisateur '%s' a déjà été décrit ligne %s"
% (u["user_name"], created[u["user_name"]]["line"])
)
# check département # check département
if u["dept"] != "": if u["dept"] != "":
dept = Departement.query.filter_by(acronym=u["dept"]).first() dept = Departement.query.filter_by(acronym=u["dept"]).first()
if dept is None: if dept is None:
user_ok = False user_ok = False
append_msg("département '%s' inexistant" % u["dept"]) append_msg("département '%s' inexistant" % u["dept"])
# check roles / ignore whitespaces around roles / build roles_string
# roles_string (expected by User) appears as column 'roles' in excel file
roles_list = []
for role in u["roles"].split(","): for role in u["roles"].split(","):
try: try:
_, _ = UserRole.role_dept_from_string(role) _, _ = UserRole.role_dept_from_string(role.strip())
roles_list.append(role.strip())
except ScoValueError as value_error: except ScoValueError as value_error:
user_ok = False user_ok = False
append_msg("role : %s " % role) append_msg("role %s : %s" % (role, value_error))
# Création de l'utilisateur (via SQLAlchemy) u["roles_string"] = ",".join(roles_list)
if user_ok: if user_ok:
created.append(u) u["line"] = line
created[u["user_name"]] = u
else: else:
import_ok = False import_ok = False
except ScoValueError as value_error: except ScoValueError as value_error:
log("import_users: exception: abort create %s" % str(created)) log("import_users: exception: abort create %s" % str(created.keys()))
raise ScoValueError(msg) # re-raise exception raise ScoValueError(msg) # re-raise exception
if import_ok: if import_ok:
for u in created: for u in created.values():
u["roles_string"] = u["roles"] # Création de l'utilisateur (via SQLAlchemy)
user = User() user = User()
user.from_dict(u, new_user=True) user.from_dict(u, new_user=True)
db.session.add(user) db.session.add(user)