#!/opt/scodoc/venv/bin/python # -*- coding: utf-8 -*- # -*- mode: python -*- ############################################################################## # # Gestion scolarite IUT # # Copyright (c) 1999 - 2024 Emmanuel Viennet. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Emmanuel Viennet emmanuel.viennet@viennet.net # ############################################################################## """Anonymize une base de données ScoDoc Runned as user "scodoc" with scodoc and postgresql up. Travaille entièrement au niveau SQL, n'utilise aucun modèle SQLAlchemy. E. Viennet, Jan 2019, Fev 2024 """ import random import sys import traceback import psycopg2 from psycopg2 import extras import urllib.parse import re def log(msg): sys.stdout.flush() sys.stderr.write(msg + "\n") sys.stderr.flush() def usage(): sys.stdout.flush() sys.stderr.flush() print(f"Usage: {sys.argv[0]} [--users] dbname", file=sys.stderr) sys.exit(1) # --- Fonctions d'Anonymisation, en SQL anonymize_name = "random_text_md5(8)" anonymize_date = "'1970-01-01'" anonymize_false = "FALSE" anonymize_question_str = "'?'" anonymize_null = "NULL" # --- Listes de noms et prénoms pour remplacer les identités NOMS = [ x.strip() for x in open("/opt/scodoc/tools/fakeportal/nomsprenoms/noms.txt", encoding="utf8") ] PRENOMS = [ x.strip() for x in open( "/opt/scodoc/tools/fakeportal/nomsprenoms/prenoms.txt", encoding="utf8" ) ] # --- Champs à anonymiser (cette configuration pourrait être placé dans # un fichier séparé et le code serait alors générique pour toute base # postgresql. # # On essaie de retirer les données personnelles des étudiants et des entreprises # # ANONYMIZED_FIELDS = { "identite.nom": anonymize_name, "identite.prenom": anonymize_name, "identite.nom_usuel": anonymize_null, "identite.civilite_etat_civil": anonymize_null, "identite.prenom_etat_civil": anonymize_null, "identite.date_naissance": anonymize_date, "identite.lieu_naissance": anonymize_question_str, "identite.dept_naissance": anonymize_question_str, "identite.nationalite": anonymize_question_str, "identite.statut": anonymize_null, "identite.boursier": anonymize_false, "identite.photo_filename": anonymize_null, "identite.code_nip": anonymize_null, "identite.code_ine": anonymize_null, "identite.scodoc7_id": anonymize_null, "adresse.email": "'ano@nyme.fr'", "adresse.emailperso": anonymize_null, "adresse.domicile": anonymize_null, "adresse.codepostaldomicile": anonymize_null, "adresse.villedomicile": anonymize_null, "adresse.paysdomicile": anonymize_null, "adresse.telephone": anonymize_null, "adresse.telephonemobile": anonymize_null, "adresse.fax": anonymize_null, "admissions.nomlycee": anonymize_name, "billet_absence.description": anonymize_null, "etud_annotations.comment": anonymize_name, "notes_appreciations.comment": anonymize_name, } def anonymize_column(cursor, tablecolumn): """Anonymise une colonne tablecolumn est de la forme nom_de_table.nom_de_colonne, par exemple "identite.nom" key_name est le nom de la colonne (clé) à utiliser pour certains remplacements (cette clé doit être anonyme et unique). Par exemple, un nom propre pourrait être remplacé par nom_valeur_de_la_clé. """ table, column = tablecolumn.split(".") anonymized = ANONYMIZED_FIELDS[tablecolumn] log(f"processing {tablecolumn}") cursor.execute(f"UPDATE {table} SET {column} = {anonymized};") def rename_students(cursor): """Remet des noms/prenoms fictifs aux étuduiants""" # Change les noms/prenoms cursor.execute("""SELECT * FROM "identite";""") etuds = cursor.fetchall() for etud in etuds: nom, prenom = random.choice(NOMS), random.choice(PRENOMS) cursor.execute( """UPDATE "identite" SET nom=%(nom)s, prenom=%(prenom)s WHERE id=%(id)s """, { "id": etud["id"], "nom": nom, "prenom": prenom, }, ) def anonymize_users(cursor): """Anonymise la table utilisateurs""" log("processing user table") cursor.execute("""UPDATE "user" SET email = 'x@y.fr';""") cursor.execute("""UPDATE "user" SET password_hash = '*';""") cursor.execute("""UPDATE "user" SET password_scodoc7 = NULL;""") cursor.execute("""UPDATE "user" SET date_created = '2001-01-01';""") cursor.execute("""UPDATE "user" SET date_expiration = '2201-12-31';""") cursor.execute("""UPDATE "user" SET token = NULL;""") cursor.execute("""UPDATE "user" SET token_expiration = NULL;""") # Change les noms/prenoms/mail cursor.execute("""SELECT * FROM "user" WHERE user_name <> 'admin';""") users = cursor.fetchall() # fetch tout car modifie cette table ds la boucle nb_users = len(users) used_user_names = {u["user_name"] for u in users} for i, user in enumerate(users): user_name = user["user_name"] nom, prenom = random.choice(NOMS), random.choice(PRENOMS) new_name = (prenom[0] + nom).lower() # unique ? while new_name in used_user_names: new_name += "x" used_user_names.add(new_name) print(f"{i}/{nb_users}\t{user_name} > {new_name}") cursor.execute( """UPDATE "user" SET nom=%(nom)s, prenom=%(prenom)s, email=%(email)s, user_name=%(new_name)s WHERE id=%(id)s """, { "email": f"{prenom}.{nom}@ano.nyme", "id": user["id"], "nom": nom, "prenom": prenom, "new_name": new_name, }, ) # Change les username: utilisés en référence externe # dans diverses tables: for table, field in ( ("etud_annotations", "author"), ("scolog", "authenticated_user"), ("scolar_news", "authenticated_user"), ("notes_appreciations", "author"), ("are_historique", "authenticated_user"), ): cursor.execute( f"""UPDATE "{table}" SET {field}=%(new_name)s WHERE {field}=%(user_name)s """, { "new_name": new_name, "user_name": user_name, }, ) def uri_rm_passwd(uri): return re.compile(r'(postgres://[^:]+:)([^@]+)(@)').sub(r'\1*****\3', uri) def anonymize_db(cursor): """Traite, une à une, les colonnes indiquées dans ANONYMIZED_FIELDS""" for tablecolumn in ANONYMIZED_FIELDS: anonymize_column(cursor, tablecolumn) if __name__ == "__main__": PROCESS_USERS = False if len(sys.argv) < 2 or len(sys.argv) > 3: usage() if len(sys.argv) > 2: if sys.argv[1] != "--users": usage() dburi = sys.argv[2] PROCESS_USERS = True else: dburi = sys.argv[1] dbname = urllib.parse.urlparse(dburi).path.lstrip("/") log(f"\nAnonymizing database {dbname}") try: cnx = psycopg2.connect(dburi) except Exception as e: log(f"\n*** Error: can't connect to database {dbname} ***\n") log(f"""connexion uri was "{uri_rm_passwd(dburi)}" """) traceback.print_exc() cnx.set_session(autocommit=False) cursor = cnx.cursor(cursor_factory=psycopg2.extras.DictCursor) anonymize_db(cursor) rename_students(cursor) if PROCESS_USERS: anonymize_users(cursor) cnx.commit() cnx.close()