forked from ScoDoc/ScoDoc
360 lines
12 KiB
Python
360 lines
12 KiB
Python
|
# -*- mode: python -*-
|
||
|
# -*- coding: utf-8 -*-
|
||
|
|
||
|
import inspect
|
||
|
import pdb
|
||
|
|
||
|
import psycopg2
|
||
|
import sqlalchemy
|
||
|
from sqlalchemy import func
|
||
|
|
||
|
from flask import current_app
|
||
|
from app import db
|
||
|
from app.auth.models import User, get_super_admin
|
||
|
import app
|
||
|
from app import models
|
||
|
from app.scodoc import notesdb as ndb
|
||
|
|
||
|
|
||
|
def import_scodoc7_dept(dept_id: str, dept_db_uri=None):
|
||
|
"""Importe un département ScoDoc7 dans ScoDoc >= 8.1
|
||
|
(base de donnée unique)
|
||
|
|
||
|
Args:
|
||
|
dept_id: acronyme du département ("RT")
|
||
|
dept_db_uri: URI de la base ScoDoc7eg "postgresql:///SCORT"
|
||
|
si None, utilise postgresql:///SCO{dept_id}
|
||
|
"""
|
||
|
dept = models.Departement.query.filter_by(acronym=dept_id).first()
|
||
|
if dept:
|
||
|
raise ValueError(f"le département {dept_id} existe déjà !")
|
||
|
if dept_db_uri is None:
|
||
|
dept_db_uri = f"postgresql:///SCO{dept_id}"
|
||
|
current_app.logger.info(f"connecting to database {dept_db_uri}")
|
||
|
cnx = psycopg2.connect(dept_db_uri)
|
||
|
cursor = cnx.cursor(cursor_factory=ndb.ScoDocCursor)
|
||
|
# Create dept:
|
||
|
dept = models.Departement(acronym=dept_id, description="migré de ScoDoc7")
|
||
|
db.session.add(dept)
|
||
|
db.session.commit()
|
||
|
#
|
||
|
id_from_scodoc7 = {} # { scodoc7id (str) : scodoc8 id (int)}
|
||
|
# Utilisateur de rattachement par défaut:
|
||
|
default_user = get_super_admin()
|
||
|
#
|
||
|
for (table, id_name) in SCO7_TABLES_ORDONNEES:
|
||
|
current_app.logger.info(f"{dept.acronym}: converting {table}...")
|
||
|
klass = get_class_for_table(table)
|
||
|
n = convert_table(dept, cursor, id_from_scodoc7, klass, id_name, default_user)
|
||
|
current_app.logger.info(f" inserted {n} objects.")
|
||
|
|
||
|
|
||
|
def get_class_for_table(table):
|
||
|
"""Return ScoDoc orm class for the given SQL table: search in our models"""
|
||
|
for name in dir(models):
|
||
|
item = getattr(models, name)
|
||
|
if inspect.isclass(item):
|
||
|
if issubclass(item, db.Model):
|
||
|
if item.__tablename__ == table:
|
||
|
return item
|
||
|
try: # pour les db.Table qui ne sont pas des classes (isclass est faux !)
|
||
|
if item.name == table:
|
||
|
return item
|
||
|
except:
|
||
|
pass
|
||
|
raise ValueError(f"No model for table {table}")
|
||
|
|
||
|
|
||
|
def get_boolean_columns(klass):
|
||
|
"return list of names of boolean attributes in this model"
|
||
|
boolean_columns = []
|
||
|
column_names = sqlalchemy.inspect(klass).columns.keys()
|
||
|
for column_name in column_names:
|
||
|
column = getattr(klass, column_name)
|
||
|
if isinstance(column.expression.type, sqlalchemy.sql.sqltypes.Boolean):
|
||
|
boolean_columns.append(column_name)
|
||
|
return boolean_columns
|
||
|
|
||
|
|
||
|
def get_table_max_id(klass):
|
||
|
"return max id in this Table (or -1 if no id)"
|
||
|
if not id in sqlalchemy.inspect(klass).columns.keys():
|
||
|
return -1
|
||
|
sql_table = str(klass.description)
|
||
|
con = db.engine.connect()
|
||
|
r = con.execute("SELECT max(id) FROM " + sql_table)
|
||
|
r.fetchone()
|
||
|
if r:
|
||
|
return r[0]
|
||
|
else: # empty table
|
||
|
return 0
|
||
|
|
||
|
|
||
|
def convert_table(
|
||
|
dept, cursor, id_from_scodoc7: dict, klass=None, id_name=None, default_user=None
|
||
|
):
|
||
|
"converti les élements d'une table scodoc7"
|
||
|
# Est-ce une Table ou un Model dans l'ORM ?
|
||
|
if isinstance(klass, sqlalchemy.sql.schema.Table):
|
||
|
is_table = True
|
||
|
current_id = get_table_max_id(klass)
|
||
|
has_id = current_id != -1
|
||
|
cnx = db.engine.connect()
|
||
|
table_name = str(klass.description)
|
||
|
boolean_columns = []
|
||
|
else:
|
||
|
is_table = False
|
||
|
has_id = True
|
||
|
cnx = None
|
||
|
table_name = klass.__tablename__
|
||
|
# Colonnes booléennes (valeurs à convertir depuis int)
|
||
|
boolean_columns = get_boolean_columns(klass)
|
||
|
# Part de l'id le plus haut actuellement présent
|
||
|
# (évidemment, nous sommes les seuls connectés à la base destination !)
|
||
|
current_id = db.session.query(func.max(klass.id)).first()
|
||
|
if (current_id is None) or (current_id[0] is None):
|
||
|
current_id = 0
|
||
|
else:
|
||
|
current_id = current_id[0]
|
||
|
# mapping: login (scodoc7) : user id (scodoc8)
|
||
|
login2id = {u.user_name: u.id for u in User.query}
|
||
|
|
||
|
# les tables ont le même nom dans les deux versions de ScoDoc:
|
||
|
cursor.execute(f"SELECT * FROM {table_name}")
|
||
|
objects = cursor.dictfetchall()
|
||
|
|
||
|
for obj in objects:
|
||
|
current_id += 1
|
||
|
convert_object(
|
||
|
current_id,
|
||
|
dept,
|
||
|
obj,
|
||
|
has_id,
|
||
|
id_from_scodoc7,
|
||
|
klass,
|
||
|
is_table,
|
||
|
id_name,
|
||
|
boolean_columns,
|
||
|
login2id,
|
||
|
default_user,
|
||
|
cnx,
|
||
|
)
|
||
|
if cnx:
|
||
|
cnx.close()
|
||
|
|
||
|
db.session.commit() # écrit la table en une fois
|
||
|
return len(objects)
|
||
|
|
||
|
|
||
|
ATTRIBUTES_MAPPING = {
|
||
|
"admissions": {
|
||
|
"debouche": None,
|
||
|
},
|
||
|
"adresse": {
|
||
|
"entreprise_id": None,
|
||
|
},
|
||
|
"etud_annotations": {
|
||
|
"zope_authenticated_user": "author",
|
||
|
"zope_remote_addr": None,
|
||
|
},
|
||
|
"identite": {
|
||
|
"foto": None,
|
||
|
},
|
||
|
"notes_formsemestre": {
|
||
|
"etape_apo2": None, # => suppressed
|
||
|
"etape_apo3": None,
|
||
|
"etape_apo4": None,
|
||
|
# préférences, plus dans formsemestre:
|
||
|
"bul_show_decision": None,
|
||
|
"bul_show_uevalid": None,
|
||
|
"nomgroupetd": None,
|
||
|
"nomgroupetp": None,
|
||
|
"nomgroupeta": None,
|
||
|
"gestion_absence": None,
|
||
|
"bul_show_codemodules": None,
|
||
|
"bul_show_rangs": None,
|
||
|
"bul_show_ue_rangs": None,
|
||
|
"bul_show_mod_rangs": None,
|
||
|
},
|
||
|
"partition": {
|
||
|
"compute_ranks": None,
|
||
|
},
|
||
|
"notes_appreciations": {
|
||
|
"zope_authenticated_user": "author",
|
||
|
"zope_remote_addr": None,
|
||
|
},
|
||
|
"scolog": {
|
||
|
"remote_addr": None,
|
||
|
"remote_host": None,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
|
||
|
def convert_object(
|
||
|
new_id,
|
||
|
dept,
|
||
|
obj: dict,
|
||
|
has_id: bool = True,
|
||
|
id_from_scodoc7: dict = None,
|
||
|
klass=None,
|
||
|
is_table: bool = False,
|
||
|
id_name=None,
|
||
|
boolean_columns=None,
|
||
|
login2id=None,
|
||
|
default_user=None,
|
||
|
cnx=None,
|
||
|
):
|
||
|
# Supprime l'id ScoDoc7 (eg "formsemestre_id") qui deviendra "id"
|
||
|
if id_name:
|
||
|
old_id = obj[id_name]
|
||
|
del obj[id_name]
|
||
|
else:
|
||
|
old_id = None # tables ScoDoc7 sans id
|
||
|
if is_table:
|
||
|
table_name = str(klass.description)
|
||
|
else:
|
||
|
table_name = klass.__tablename__
|
||
|
# Les champs contant des id utilisateurs:
|
||
|
# chaine login en ScoDoc7, uid numérique en ScoDoc 8+
|
||
|
USER_REFS = {"responsable_id", "ens_id", "uid"}
|
||
|
if not is_table:
|
||
|
# Supprime les attributs obsoletes (très anciennes versions de ScoDoc):
|
||
|
attributs = ATTRIBUTES_MAPPING.get(table_name, {})
|
||
|
# renomme ou supprime les attributs
|
||
|
for k in attributs.keys() & obj.keys():
|
||
|
v = attributs[k]
|
||
|
if v is not None:
|
||
|
obj[v] = obj[k]
|
||
|
del obj[k]
|
||
|
# map les ids (foreign keys)
|
||
|
for k in obj:
|
||
|
if (k.endswith("id") or k == "object") and k not in USER_REFS | {
|
||
|
"semestre_id",
|
||
|
"sem_id",
|
||
|
}:
|
||
|
old_ref = obj[k]
|
||
|
if old_ref is not None:
|
||
|
if isinstance(old_ref, str):
|
||
|
old_ref = old_ref.strip()
|
||
|
elif k == "entreprise_id": # id numérique spécial
|
||
|
old_ref = f"entreprises.{old_ref}"
|
||
|
elif k == "entreprise_corresp_id":
|
||
|
old_ref = f"entreprise_correspondant.{old_ref}"
|
||
|
|
||
|
if old_ref == "NULL" or not old_ref: # buggy old entries
|
||
|
new_ref = None
|
||
|
elif old_ref in id_from_scodoc7:
|
||
|
new_ref = id_from_scodoc7[old_ref]
|
||
|
elif (not is_table) and table_name in {
|
||
|
"scolog",
|
||
|
"etud_annotations",
|
||
|
"notes_notes_log",
|
||
|
"scolar_news",
|
||
|
}:
|
||
|
# tables avec "fausses" clés
|
||
|
# (l'object référencé a pu disparaitre)
|
||
|
new_ref = None
|
||
|
else:
|
||
|
raise ValueError(f"no new id for {table_name}.{k}='{obj[k]}' !")
|
||
|
obj[k] = new_ref
|
||
|
# Remape les utilisateur: user.id
|
||
|
# S'il n'existe pas, rattache à l'admin
|
||
|
for k in USER_REFS & obj.keys():
|
||
|
login_scodoc7 = obj[k]
|
||
|
uid = login2id.get(login_scodoc7)
|
||
|
if not uid:
|
||
|
uid = default_user.id
|
||
|
current_app.logger.warning(
|
||
|
f"non existent user: {login_scodoc7}: giving {table_name}({old_id}) to admin"
|
||
|
)
|
||
|
# raise ValueError(f"non existent user: {login_scodoc7}")
|
||
|
obj[k] = uid
|
||
|
# Converti les booléens
|
||
|
for k in boolean_columns:
|
||
|
obj[k] = bool(obj[k])
|
||
|
|
||
|
# Ajoute le département si besoin:
|
||
|
if hasattr(klass, "dept_id"):
|
||
|
obj["dept_id"] = dept.id
|
||
|
|
||
|
# Fixe l'id (ainsi nous évitons d'avoir à commit() après chaque entrée)
|
||
|
if has_id:
|
||
|
obj["id"] = new_id
|
||
|
|
||
|
if is_table:
|
||
|
statement = sqlalchemy.insert(klass).values(**obj)
|
||
|
_ = cnx.execute(statement)
|
||
|
else:
|
||
|
new_obj = klass(**obj) # ORM object
|
||
|
db.session.add(new_obj)
|
||
|
|
||
|
# Stocke l'id pour les références (foreign keys):
|
||
|
if id_name and has_id:
|
||
|
if isinstance(old_id, int):
|
||
|
# les id int étaient utilisés pour les "entreprises"
|
||
|
old_id = table_name + "." + str(old_id)
|
||
|
id_from_scodoc7[old_id] = new_id
|
||
|
|
||
|
|
||
|
# tables ordonnées topologiquement pour les clés étrangères:
|
||
|
# g = nx.read_adjlist("misc/model-scodoc7.csv", create_using=nx.DiGraph,delimiter=";")
|
||
|
# L = list(reversed(list(nx.topological_sort(g))))
|
||
|
SCO7_TABLES_ORDONNEES = [
|
||
|
# (table SQL, nom de l'id scodoc7)
|
||
|
("notes_formations", "formation_id"),
|
||
|
("notes_ue", "ue_id"),
|
||
|
("notes_matieres", "matiere_id"),
|
||
|
("notes_formsemestre", "formsemestre_id"),
|
||
|
("notes_modules", "module_id"),
|
||
|
("notes_moduleimpl", "moduleimpl_id"),
|
||
|
(
|
||
|
"notes_modules_enseignants",
|
||
|
"modules_enseignants_id",
|
||
|
), # (relation) avait un id modules_enseignants_id
|
||
|
("partition", "partition_id"),
|
||
|
("identite", "etudid"),
|
||
|
("entreprises", "entreprise_id"),
|
||
|
("notes_evaluation", "evaluation_id"),
|
||
|
("group_descr", "group_id"),
|
||
|
("group_membership", "group_membership_id"), # (relation)
|
||
|
("notes_semset", "semset_id"),
|
||
|
("notes_tags", "tag_id"),
|
||
|
("itemsuivi", "itemsuivi_id"),
|
||
|
("itemsuivi_tags", "tag_id"),
|
||
|
("adresse", "adresse_id"),
|
||
|
("admissions", "adm_id"),
|
||
|
("absences", ""),
|
||
|
("scolar_news", "news_id"),
|
||
|
("scolog", ""),
|
||
|
("etud_annotations", "id"),
|
||
|
("billet_absence", "billet_id"),
|
||
|
("entreprise_correspondant", "entreprise_corresp_id"),
|
||
|
("entreprise_contact", "entreprise_contact_id"),
|
||
|
("absences_notifications", ""),
|
||
|
# ("notes_form_modalites", "form_modalite_id"), : déjà initialisées
|
||
|
("notes_appreciations", "id"),
|
||
|
("scolar_autorisation_inscription", "autorisation_inscription_id"),
|
||
|
("scolar_formsemestre_validation", "formsemestre_validation_id"),
|
||
|
("scolar_events", "event_id"),
|
||
|
("notes_notes_log", "id"),
|
||
|
("notes_notes", ""),
|
||
|
("notes_moduleimpl_inscription", "moduleimpl_inscription_id"),
|
||
|
("notes_formsemestre_inscription", "formsemestre_inscription_id"),
|
||
|
("notes_formsemestre_custommenu", "custommenu_id"),
|
||
|
(
|
||
|
"notes_formsemestre_ue_computation_expr",
|
||
|
"notes_formsemestre_ue_computation_expr_id",
|
||
|
),
|
||
|
("notes_formsemestre_uecoef", "formsemestre_uecoef_id"),
|
||
|
("notes_semset_formsemestre", ""), # (relation)
|
||
|
("notes_formsemestre_etapes", ""),
|
||
|
("notes_formsemestre_responsables", ""), # (relation)
|
||
|
("notes_modules_tags", ""),
|
||
|
("itemsuivi_tags_assoc", ""), # (relation)
|
||
|
("sco_prefs", "pref_id"),
|
||
|
]
|
||
|
|
||
|
"""
|
||
|
from tools.import_scodoc7_dept import *
|
||
|
import_scodoc7_dept( "RT" )
|
||
|
"""
|