80 lines
3.0 KiB
Python
80 lines
3.0 KiB
Python
# -*- mode: python -*-
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import pdb
|
|
import re
|
|
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
|
|
from flask import current_app
|
|
from app import db
|
|
from app.auth.models import User, Role
|
|
|
|
|
|
def import_scodoc7_user_db(scodoc7_db="dbname=SCOUSERS"):
|
|
"""Create users from existing ScoDoc7 db (SCOUSERS)
|
|
The resulting users are in SCO8USERS,
|
|
handled via Flask/SQLAlchemy ORM.
|
|
"""
|
|
messages = []
|
|
cnx = psycopg2.connect(scodoc7_db)
|
|
cursor = cnx.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
cursor.execute("SELECT * FROM sco_users;")
|
|
for u7 in cursor:
|
|
if User.query.filter_by(user_name=u7["user_name"]).first():
|
|
# user with same name exists !
|
|
current_app.logger.warning(
|
|
"User {} exists and is left unchanged".format(u7["user_name"])
|
|
)
|
|
else:
|
|
u = User(
|
|
user_name=u7["user_name"],
|
|
email=u7["email"],
|
|
date_modif_passwd=u7["date_modif_passwd"],
|
|
nom=u7["nom"],
|
|
prenom=u7["prenom"],
|
|
dept=u7["dept"],
|
|
passwd_temp=u7["passwd_temp"],
|
|
date_expiration=u7["date_expiration"],
|
|
password_scodoc7=u7["passwd"],
|
|
active=(u7["status"] == None),
|
|
)
|
|
# Set roles:
|
|
# ScoDoc7 roles are stored as 'AdminRT,EnsRT'
|
|
if u7["roles"]:
|
|
roles7 = u7["roles"].split(",")
|
|
else:
|
|
roles7 = []
|
|
for role_dept in roles7:
|
|
# Cas particulier RespPeRT
|
|
m = re.match(r"^(-?RespPe)([A-Z][A-Za-z0-9]*?)$", role_dept)
|
|
if not m:
|
|
# Cas général: eg EnsRT
|
|
m = re.match(r"^(-?[A-Za-z0-9]+?)([A-Z][A-Za-z0-9]*?)$", role_dept)
|
|
if not m:
|
|
msg = (
|
|
f"User {u7['user_name']}: invalid role '{role_dept}' (ignoring)"
|
|
)
|
|
current_app.logger.warning(msg)
|
|
messages.append(msg)
|
|
else:
|
|
role_name = m.group(1)
|
|
if role_name.startswith("-"):
|
|
# disabled users in ScoDoc7
|
|
role_name = role_name[1:]
|
|
assert not u.active
|
|
# silently ignore old (disabled) role
|
|
else:
|
|
dept = m.group(2)
|
|
role = Role.query.filter_by(name=role_name).first()
|
|
if not role:
|
|
msg = f"User {u7['user_name']}: ignoring role '{role_dept}'"
|
|
current_app.logger.warning(msg)
|
|
messages.append(msg)
|
|
else:
|
|
u.add_role(role, dept)
|
|
db.session.add(u)
|
|
current_app.logger.info("imported user {}".format(u))
|
|
db.session.commit()
|
|
return messages |