# -*- coding: UTF-8 -*- """Application Flask: ScoDoc""" import datetime from pprint import pprint as pp import re import sys import click import flask from flask.cli import with_appcontext from flask.templating import render_template from flask_login import login_user, logout_user, current_user import psycopg2 import sqlalchemy as sa import app as mapp from app import create_app, cli, db from app import initialize_scodoc_database from app import clear_scodoc_cache from app import models from app.auth.models import User, Role, UserRole from app.entreprises.models import entreprises_reset_database from app.models import Departement, departements from app.models import Formation, UniteEns, Matiere, Module from app.models import FormSemestre, FormSemestreInscription from app.models import GroupDescr from app.models import Identite from app.models import ModuleImpl, ModuleImplInscription from app.models import Partition from app.models import ScolarAutorisationInscription, ScolarFormSemestreValidation from app.models.but_refcomp import ( ApcCompetence, ApcNiveau, ApcParcours, ApcReferentielCompetences, ) from app.models.but_validations import ApcValidationAnnee, ApcValidationRCUE from app.models.evaluations import Evaluation from app.models.formsemestre import notes_formsemestre_responsables from app.models.moduleimpls import notes_modules_enseignants from app.scodoc import sco_dump_db from app.scodoc.sco_logos import make_logo_local from app.scodoc.sco_permissions import Permission from app.views import notes, scolar, ScoData import app.scodoc.sco_utils as scu import tools from tools.fakedatabase import create_test_api_database from config import RunningConfig app = create_app(RunningConfig) cli.register(app) @app.context_processor def inject_sco_utils(): "Make Permission, sco and scu available in all Jinja templates" # if modified, put the same in conftest.py#27 return { "DEBUG": flask.current_app.config["DEBUG"], "Permission": Permission, "scu": scu, "sco": ScoData(), } @app.shell_context_processor def make_shell_context(): import numpy as np import pandas as pd import app as mapp # le package app from app.scodoc import notesdb as ndb from app.comp import res_sem from app.comp.res_but import ResultatsSemestreBUT from app.scodoc import sco_utils as scu return { "ApcCompetence": ApcCompetence, "ApcNiveau": ApcNiveau, "ApcParcours": ApcParcours, "ApcReferentielCompetences": ApcReferentielCompetences, "ApcValidationRCUE": ApcValidationRCUE, "ApcValidationAnnee": ApcValidationAnnee, "ctx": app.test_request_context(), "current_app": flask.current_app, "current_user": current_user, "datetime": datetime, "Departement": Departement, "db": db, "Evaluation": Evaluation, "flask": flask, "Formation": Formation, "FormSemestre": FormSemestre, "FormSemestreInscription": FormSemestreInscription, "GroupDescr": GroupDescr, "Identite": Identite, "login_user": login_user, "logout_user": logout_user, "mapp": mapp, "Matiere": Matiere, "models": models, "Module": Module, "ModuleImpl": ModuleImpl, "ModuleImplInscription": ModuleImplInscription, "ndb": ndb, "notes": notes, "np": np, "Partition": Partition, "pd": pd, "Permission": Permission, "pp": pp, "res_sem": res_sem, "ResultatsSemestreBUT": ResultatsSemestreBUT, "Role": Role, "ScoDocSiteConfig": models.ScoDocSiteConfig, "scolar": scolar, "ScolarAutorisationInscription": ScolarAutorisationInscription, "ScolarFormSemestreValidation": ScolarFormSemestreValidation, "ScolarNews": models.ScolarNews, "scu": scu, "UniteEns": UniteEns, "User": User, "UserRole": UserRole, } # ctx.push() # admin = User.query.filter_by(user_name="admin").first() # login_user(admin) @app.cli.command() @click.option("--erase/--no-erase", default=False) def sco_db_init(erase=False): # sco-db-init """Initialize the database. Starts from an existing database and create all the necessary SQL tables and functions. """ if not app.config.get("SCODOC_ADMIN_MAIL"): sys.stderr.write( """La variable SCODOC_ADMIN_MAIL n'est pas positionnée: vérifier votre .env""" ) return 100 initialize_scodoc_database(erase=erase) @app.cli.command() @click.argument("database") def anonymize_db(database): # anonymize-db """Anonymise la base de nom indiqué (et non pas la base courante!)""" click.confirm( f"L'anonymisation va affecter la base {database} et PERDRE beaucoup de données.\nContinuer ?", abort=True, ) sco_dump_db.anonymize_db(database) click.echo(f"Base {database} pseudonymisée") @app.cli.command() def user_db_clear(): """Erase all users and roles from the database !""" click.echo("Erasing the users database !") _clear_users_db() def _clear_users_db(): """Erase (drop) all tables of users database !""" click.confirm( "This will erase all users and roles.\nAre you sure you want to continue?", abort=True, ) db.reflect() try: db.session.query(UserRole).delete() db.session.query(User).delete() db.session.commit() except: db.session.rollback() raise @app.cli.command() @click.argument("username") @click.argument("role") @click.argument("dept") @click.option("-n", "--nom", "nom") @click.option("-p", "--prenom", "prenom") def user_create(username, role, dept, nom=None, prenom=None): # user-create "Create a new user" r = Role.get_named_role(role) if not r: sys.stderr.write(f"user_create: role {role} does not exist\n") return 1 u = User.query.filter_by(user_name=username).first() if u: sys.stderr.write(f"user_create: user {u} already exists\n") return 2 if dept == "@all": dept = None u = User(user_name=username, dept=dept, nom=nom, prenom=prenom) u.add_role(r, dept) db.session.add(u) db.session.commit() click.echo(f"created user, login: {u.user_name}, with role {r} in dept. {dept}") @app.cli.command() @click.argument("username") def user_delete(username): # user-delete "Try to delete this user. Fails if it's associated to some scodoc objects." u = User.query.filter_by(user_name=username).first() if not u: sys.stderr.write(f"user_delete: user {username} not found\n") return 2 # Vérifie que l'utilisateur n'est pas... utilisé # - Resp. FormSemestre (table assoc) query = sa.select(sa.func.count()).where( notes_formsemestre_responsables.c.responsable_id == u.id ) nb = db.session.execute(query).scalar() if nb > 0: sys.stderr.write( "\nuser_delete: erreur: utilisateur déclaré comme resp. de semestre\n" ) return 1 # - Resp. Module if ModuleImpl.query.filter_by(responsable_id=217).count() > 0: sys.stderr.write( "\nuser_delete: erreur: utilisateur déclaré comme enseignant\n" ) return 1 # - Enseignant d'un module (table assoc) query = sa.select(sa.func.count()).where(notes_modules_enseignants.c.ens_id == u.id) nb = db.session.execute(query).scalar() if nb > 0: sys.stderr.write( "\nuser_delete: erreur: utilisateur déclaré comme enseignant\n" ) return 1 # db.session.delete(u) try: db.session.commit() except (sa.exc.IntegrityError, psycopg2.errors.ForeignKeyViolation): sys.stderr.write( f"""\nuser_delete: ne peux pas supprimer l'utilisateur {username}\ncar il est associé à des objets dans ScoDoc (modules, notes, ...).\n""" ) return 1 click.echo(f"deleted user, login: {username}") @app.cli.command() @click.argument("username") @click.password_option() def user_password(username, password=None): # user-password "Set (or change) user's password" if not password: sys.stderr.write("user_password: missing password") return 1 u = User.query.filter_by(user_name=username).first() if not u: sys.stderr.write(f"user_password: user {username} does not exists\n") return 1 u.set_password(password) db.session.add(u) db.session.commit() click.echo(f"changed password for user {u}") @app.cli.command() @click.argument("rolename") @click.argument("permissions", nargs=-1) def create_role(rolename, permissions): # create-role """Create a new role""" # Check rolename if not re.match(r"^[a-zA-Z0-9]+$", rolename): sys.stderr.write(f"create_role: invalid rolename {rolename}\n") return 1 # Check permissions permission_list = [] for permission_name in permissions: perm = Permission.get_by_name(permission_name) if not perm: sys.stderr.write(f"create_role: invalid permission name {perm}\n") sys.stderr.write( f"\tavailable permissions: {', '.join([ name for name in Permission.permission_by_name])}.\n" ) return 1 permission_list.append(perm) role = Role.query.filter_by(name=rolename).first() if role: sys.stderr.write(f"create_role: role {rolename} already exists\n") return 1 role = Role(name=rolename) for perm in permission_list: role.add_permission(perm) db.session.add(role) db.session.commit() @app.cli.command() def list_roles(): # list-roles """List all defined roles""" for role in Role.query: print(role) @app.cli.command() @click.argument("rolename") @click.option("-a", "--add", "addpermissionname") @click.option("-r", "--remove", "removepermissionname") def edit_role(rolename, addpermissionname=None, removepermissionname=None): # edit-role """Add [-a] and/or remove [-r] a permission to/from a role. In ScoDoc, permissions are not associated to users but to roles. Each user has a set of roles in each departement. Example: `flask edit-role -a EditApogee Ens` """ if addpermissionname: perm_to_add = Permission.get_by_name(addpermissionname) if not perm_to_add: sys.stderr.write( f"edit_role: permission {addpermissionname} does not exists\n" ) return 1 else: perm_to_add = None if removepermissionname: perm_to_remove = Permission.get_by_name(removepermissionname) if not perm_to_remove: sys.stderr.write( f"edit_role: permission {removepermissionname} does not exists\n" ) return 1 else: perm_to_remove = None role = Role.query.filter_by(name=rolename).first() if not role: sys.stderr.write(f"edit_role: role {rolename} does not exists\n") return 1 if perm_to_add: role.add_permission(perm_to_add) click.echo(f"adding permission {addpermissionname} to role {rolename}") if perm_to_remove: role.remove_permission(perm_to_remove) click.echo(f"removing permission {removepermissionname} from role {rolename}") if perm_to_add or perm_to_remove: db.session.add(role) db.session.commit() print(role) @app.cli.command() @click.argument("rolename") def delete_role(rolename): """Delete a role""" role = Role.query.filter_by(name=rolename).first() if role is None: sys.stderr.write(f"delete_role: role {rolename} does not exists\n") return 1 db.session.delete(role) db.session.commit() @app.cli.command() @click.argument("username") @click.option("-d", "--dept", "dept_acronym") @click.option("-a", "--add", "add_role_name") @click.option("-r", "--remove", "remove_role_name") def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name=None): """Add or remove a role to the given user in the given dept""" user: User = User.query.filter_by(user_name=username).first() if not user: sys.stderr.write(f"user_role: user {username} does not exists\n") return 1 # Sans argument, affiche les rôles de l'utilisateur if dept_acronym is None and add_role_name is None and remove_role_name is None: print(f"Roles for user {user.user_name}") for user_role in sorted( user.user_roles, key=lambda ur: (ur.dept or "", ur.role.name) ): print(f"""{user_role.dept or "tous"}:\t{user_role.role.name}""") if dept_acronym: dept = models.Departement.query.filter_by(acronym=dept_acronym).first() if dept is None: sys.stderr.write(f"Erreur: le departement {dept_acronym} n'existe pas !\n") return 2 if add_role_name: role = Role.query.filter_by(name=add_role_name).first() if role is None: sys.stderr.write( f"""user_role: role {add_role_name} does not exists (use list-roles to display existing roles)\n""" ) return 2 user.add_role(role, dept_acronym) if remove_role_name: role = Role.query.filter_by(name=remove_role_name).first() if role is None: sys.stderr.write(f"user_role: role {remove_role_name} does not exists\n") return 2 user_role = UserRole.query.filter( UserRole.role == role, UserRole.user == user, UserRole.dept == dept_acronym ).first() if user_role: db.session.delete(user_role) db.session.commit() return 0 @app.cli.command() @click.argument("user_name") @click.argument("new_user_name") def user_change_login(user_name, new_user_name): """Change user's login (user_name)""" user: User = User.query.filter_by(user_name=user_name).first() if not user: sys.stderr.write(f"user_change_login: user {user_name} does not exists\n") return 1 user.change_user_name(new_user_name) def abort_if_false(ctx, param, value): if not value: ctx.abort() @app.cli.command() @click.option( "-y", "--yes", is_flag=True, callback=abort_if_false, expose_value=False, prompt="""Attention: Cela va effacer toutes les données du département (étudiants, notes, formations, etc). Voulez-vous vraiment continuer ? """, ) @click.option( "-f", "--force", is_flag=True, help="ignore non-existing departement", ) @click.argument("dept") def delete_dept(dept, force=False): # delete-dept """Delete existing departement""" from app.scodoc import notesdb as ndb from app.scodoc import sco_dept msg = "" db.reflect() ndb.open_db_connection() d = models.Departement.query.filter_by(acronym=dept).first() if d is None and not force: sys.stderr.write(f"Erreur: le departement {dept} n'existe pas !\n") return 2 elif d: msg = sco_dept.delete_dept(d.id) db.session.commit() if msg: print(f"Erreur:\n {msg}") return 0 if not msg else 1 @app.cli.command() @click.argument("dept") def create_dept(dept): # create-dept "Create new departement" _ = departements.create_dept(dept) return 0 @app.cli.command() @click.argument("depts", nargs=-1) def list_depts(depts=""): # list-depts """If dept exists, print it, else nothing. Called without arguments, list all depts along with their ids. """ for dept in models.Departement.query.order_by(models.Departement.id): if not depts or dept.acronym in depts: print(f"{dept.id}\t{dept.acronym}") @app.cli.command() @click.option( "-n", "--name", is_flag=True, help="show database name instead of connexion string (required for " "dropdb/createdb commands)", ) def scodoc_database(name): # scodoc-database """print the database connexion string""" uri = app.config["SQLALCHEMY_DATABASE_URI"] if name: print(uri.split("/")[-1]) else: print(uri) @app.cli.command() @with_appcontext def import_scodoc7_users(): # import-scodoc7-users """Import users defined in ScoDoc7 postgresql database into ScoDoc 9 The old database SCOUSERS must be alive and readable by the current user. This script is typically run as unix user "scodoc". The original SCOUSERS database is left unmodified. """ messages = tools.import_scodoc7_user_db() click.echo("----") click.echo(f"import terminé: {len(messages)} warnings\n") click.echo("\n".join(messages) + "\n") @app.cli.command() @click.argument("dept") @click.argument("dept_db_name") @with_appcontext def import_scodoc7_dept(dept: str, dept_db_name: str = ""): # import-scodoc7-dept """Import département ScoDoc 7: dept: InfoComm, dept_db_name: SCOINFOCOMM""" dept_db_uri = f"postgresql:///{dept_db_name}" tools.import_scodoc7_dept(dept, dept_db_uri) @app.cli.command() @click.argument("dept", default="") @with_appcontext def migrate_scodoc7_dept_archives(dept: str): # migrate-scodoc7-dept-archives """Post-migration: renomme les archives en fonction des id de ScoDoc 9""" tools.migrate_scodoc7_dept_archives(dept) @app.cli.command() @click.argument("dept", default="") @with_appcontext def migrate_scodoc7_dept_logos(dept: str = ""): # migrate-scodoc7-dept-logos """Post-migration: renomme les logos en fonction des id / dept de ScoDoc 9""" tools.migrate_scodoc7_dept_logos(dept) @app.cli.command() @click.argument("logo", default=None) @click.argument("dept", default=None) @with_appcontext def localize_logo(logo: str = None, dept: str = None): # migrate-scodoc7-dept-logos """Make local to a dept a global logo (both logo and dept names are mandatory)""" if logo in ["header", "footer"]: print( f"Can't make logo '{logo}' local: add a local version throught configuration form instead" ) return make_logo_local(logoname=logo, dept_name=dept) @app.cli.command() @click.argument("formsemestre_id", type=click.INT) @click.argument("xlsfile", type=click.File("rb")) @click.argument("zipfile", type=click.File("rb")) def photos_import_files(formsemestre_id: int, xlsfile: str, zipfile: str): """Import des photos d'étudiants à partir d'une liste excel et d'un zip avec les images.""" from app.scodoc import sco_trombino, sco_photos from app.auth.models import get_super_admin formsemestre = db.session.get(FormSemestre, formsemestre_id) if not formsemestre: sys.stderr.write("photos-import-files: formsemestre_id invalide\n") return 2 with app.test_request_context(): mapp.set_sco_dept(formsemestre.departement.acronym) admin_user = get_super_admin() login_user(admin_user) def callback(etud: Identite, data, filename): return sco_photos.store_photo(etud, data, filename) ( ignored_zipfiles, unmatched_files, stored_etud_filename, ) = sco_trombino.zip_excel_import_files( xlsfile=xlsfile, zipfile=zipfile, callback=callback, filename_title="fichier_photo", ) print( render_template( "scolar/photos_import_files.txt", ignored_zipfiles=ignored_zipfiles, unmatched_files=unmatched_files, stored_etud_filename=stored_etud_filename, ) ) @app.cli.command() @click.option("--sanitize/--no-sanitize", default=False) @with_appcontext def clear_cache(sanitize): # clear-cache """Clear ScoDoc cache This cache (currently Redis) is persistent between invocation and it may be necessary to clear it during upgrades, development or tests. """ click.echo("Flushing Redis cache...") clear_scodoc_cache() if sanitize: # sanitizes all formations: click.echo("Checking formations...") for formation in Formation.query: formation.sanitize_old_formation() @app.cli.command() def init_test_database(): # init-test-database """Initialise les objets en base pour les tests API (à appliquer sur SCODOC_TEST ou SCODOC_DEV) """ click.echo("Initialisation base de test API...") ctx = app.test_request_context() ctx.push() admin = User.query.filter_by(user_name="admin").first() login_user(admin) create_test_api_database.init_test_database() def recursive_help(cmd, parent=None): ctx = click.core.Context(cmd, info_name=cmd.name, parent=parent) print(cmd.get_help(ctx)) print() commands = getattr(cmd, "commands", {}) for sub in commands.values(): recursive_help(sub, ctx) @app.cli.command() def entreprises_reset_db(): """Remet a zéro les tables du module relations entreprises""" click.confirm( "This will erase all data from the blueprint 'entreprises'.\nAre you sure you want to continue?", abort=True, ) db.reflect() try: entreprises_reset_database() except: db.session.rollback() raise @app.cli.command() def dumphelp(): """Génère la page d'aide complète pour la doc.""" recursive_help(app.cli) @app.cli.command() @click.option("-h", "--host", default="127.0.0.1", help="The interface to bind to.") @click.option("-p", "--port", default=5000, help="The port to bind to.") @click.option( "--length", default=25, help="Number of functions to include in the profiler report.", ) @click.option( "--profile-dir", default=None, help="Directory where profiler data files are saved." ) def profile(host, port, length, profile_dir): """Start the application under the code profiler.""" from werkzeug.middleware.profiler import ProfilerMiddleware from werkzeug.serving import run_simple app.wsgi_app = ProfilerMiddleware( app.wsgi_app, restrictions=[length], profile_dir=profile_dir ) run_simple( host, port, app, use_debugger=False ) # use run_simple instead of app.run() # <== Gestion de l'assiduité ==> @app.cli.command() @click.option( "-d", "--dept", help="Restreint la migration au dept sélectionné (ACRONYME)" ) @click.option( "-m", "--morning", help="Spécifie l'heure de début des cours format `hh:mm`", ) @click.option( "-n", "--noon", help="Spécifie l'heure de fin du matin format `hh:mm`", ) @click.option( "-a", "--afternoon", help="Spécifie l'heure de début de l'après-midi format `hh:mm` valeur identique à --noon si non spécifié", ) @click.option( "-e", "--evening", help="Spécifie l'heure de fin des cours format `hh:mm`", ) @with_appcontext def migrate_abs_to_assiduites( dept: str = None, morning: str = None, noon: str = None, afternoon: str = None, evening: str = None, ): # migrate-abs-to-assiduites """Permet de migrer les absences vers le nouveau module d'assiduités""" tools.migrate_abs_to_assiduites(dept, morning, noon, afternoon, evening) # import cProfile # cProfile.runctx( # f"tools.migrate_abs_to_assiduites({dept})", # {"tools": tools}, # {}, # "migration-nimes", # ) @app.cli.command() @click.option( "-d", "--dept", help="Restreint la suppression au dept sélectionné (ACRONYME)" ) @click.option( "-a", "--assiduites", is_flag=True, help="Supprime les assiduités de scodoc", ) @click.option( "-j", "--justificatifs", is_flag=True, help="Supprime les justificatifs de scodoc", ) @with_appcontext def downgrade_assiduites_module( dept: str = None, assiduites: bool = False, justificatifs: bool = False ): """Supprime les assiduites et/ou les justificatifs de tous les départements ou du département sélectionné""" tools.downgrade_module(dept, assiduites, justificatifs) @app.cli.command() def generate_ens_calendars(): # generate-ens-calendars """Génère les calendrier enseignants à partir des ics semestres""" from tools.edt import edt_ens edt_ens.generate_ens_calendars() @app.cli.command() @click.option( "-e", "--endpoint", default="api.", help="Endpoint à partir duquel générer la documentation des routes", ) @with_appcontext def gen_api_doc(endpoint): # gen-api-map """Génère la documentation des routes de l'API.""" tools.gen_api_doc(app, endpoint_start=endpoint)