1
0
forked from ScoDoc/ScoDoc
ScoDoc/scodoc.py

646 lines
20 KiB
Python
Executable File

# -*- coding: UTF-8 -*-
"""Application Flask: ScoDoc
"""
from pprint import pprint as pp
import re
import sys
import click
import flask
from flask.cli import with_appcontext
from flask.templating import render_template
from flask_login import login_user, logout_user, current_user
import psycopg2
import sqlalchemy
import app as mapp
from app import create_app, cli, db
from app import initialize_scodoc_database
from app import clear_scodoc_cache
from app import models
from app.auth.models import User, Role, UserRole
from app.entreprises.models import entreprises_reset_database
from app.models import Departement, departements
from app.models import Formation, UniteEns, Matiere, Module
from app.models import FormSemestre, FormSemestreInscription
from app.models import GroupDescr
from app.models import Identite
from app.models import ModuleImpl, ModuleImplInscription
from app.models import Partition
from app.models import ScolarAutorisationInscription, ScolarFormSemestreValidation
from app.models.but_refcomp import (
ApcCompetence,
ApcNiveau,
ApcParcours,
ApcReferentielCompetences,
)
from app.models.but_validations import ApcValidationAnnee, ApcValidationRCUE
from app.models.evaluations import Evaluation
from app.scodoc import sco_dump_db
from app.scodoc.sco_logos import make_logo_local
from app.scodoc.sco_permissions import Permission
from app.views import notes, scolar
import app.scodoc.sco_utils as scu
import tools
from tools.fakedatabase import create_test_api_database
from config import RunningConfig
app = create_app(RunningConfig)
cli.register(app)
@app.context_processor
def inject_sco_utils():
"Make scu available in all Jinja templates"
return dict(scu=scu)
@app.shell_context_processor
def make_shell_context():
import numpy as np
import pandas as pd
import app as mapp # le package app
from app.scodoc import notesdb as ndb
from app.comp import res_sem
from app.comp.res_but import ResultatsSemestreBUT
from app.scodoc import sco_utils as scu
return {
"ApcCompetence": ApcCompetence,
"ApcNiveau": ApcNiveau,
"ApcParcours": ApcParcours,
"ApcReferentielCompetences": ApcReferentielCompetences,
"ApcValidationRCUE": ApcValidationRCUE,
"ApcValidationAnnee": ApcValidationAnnee,
"ctx": app.test_request_context(),
"current_app": flask.current_app,
"current_user": current_user,
"Departement": Departement,
"db": db,
"Evaluation": Evaluation,
"flask": flask,
"Formation": Formation,
"FormSemestre": FormSemestre,
"FormSemestreInscription": FormSemestreInscription,
"GroupDescr": GroupDescr,
"Identite": Identite,
"login_user": login_user,
"logout_user": logout_user,
"mapp": mapp,
"Matiere": Matiere,
"models": models,
"Module": Module,
"ModuleImpl": ModuleImpl,
"ModuleImplInscription": ModuleImplInscription,
"ndb": ndb,
"notes": notes,
"np": np,
"Partition": Partition,
"pd": pd,
"Permission": Permission,
"pp": pp,
"res_sem": res_sem,
"ResultatsSemestreBUT": ResultatsSemestreBUT,
"Role": Role,
"ScoDocSiteConfig": models.ScoDocSiteConfig,
"scolar": scolar,
"ScolarAutorisationInscription": ScolarAutorisationInscription,
"ScolarFormSemestreValidation": ScolarFormSemestreValidation,
"ScolarNews": models.ScolarNews,
"scu": scu,
"UniteEns": UniteEns,
"User": User,
"UserRole": UserRole,
}
# ctx.push()
# admin = User.query.filter_by(user_name="admin").first()
# login_user(admin)
@app.cli.command()
@click.option("--erase/--no-erase", default=False)
def sco_db_init(erase=False): # sco-db-init
"""Initialize the database.
Starts from an existing database and create all
the necessary SQL tables and functions.
"""
if not app.config.get("SCODOC_ADMIN_MAIL"):
sys.stderr.write(
"""La variable SCODOC_ADMIN_MAIL n'est pas positionnée: vérifier votre .env"""
)
return 100
initialize_scodoc_database(erase=erase)
@app.cli.command()
@click.argument("database")
def anonymize_db(database): # anonymize-db
"""Anonymise la base de nom indiqué (et non pas la base courante!)"""
click.confirm(
f"L'anonymisation va affecter la base {database} et PERDRE beaucoup de données.\nContinuer ?",
abort=True,
)
sco_dump_db.anonymize_db(database)
click.echo(f"Base {database} pseudonymisée")
@app.cli.command()
def user_db_clear():
"""Erase all users and roles from the database !"""
click.echo("Erasing the users database !")
_clear_users_db()
def _clear_users_db():
"""Erase (drop) all tables of users database !"""
click.confirm(
"This will erase all users and roles.\nAre you sure you want to continue?",
abort=True,
)
db.reflect()
try:
db.session.query(UserRole).delete()
db.session.query(User).delete()
db.session.commit()
except:
db.session.rollback()
raise
@app.cli.command()
@click.argument("username")
@click.argument("role")
@click.argument("dept")
@click.option("-n", "--nom", "nom")
@click.option("-p", "--prenom", "prenom")
def user_create(username, role, dept, nom=None, prenom=None): # user-create
"Create a new user"
r = Role.get_named_role(role)
if not r:
sys.stderr.write(f"user_create: role {role} does not exist\n")
return 1
u = User.query.filter_by(user_name=username).first()
if u:
sys.stderr.write(f"user_create: user {u} already exists\n")
return 2
if dept == "@all":
dept = None
u = User(user_name=username, dept=dept, nom=nom, prenom=prenom)
u.add_role(r, dept)
db.session.add(u)
db.session.commit()
click.echo(f"created user, login: {u.user_name}, with role {r} in dept. {dept}")
@app.cli.command()
@click.argument("username")
def user_delete(username): # user-delete
"Try to delete this user. Fails if it's associated to some scodoc objects."
u = User.query.filter_by(user_name=username).first()
if not u:
sys.stderr.write(f"user_delete: user {username} not found\n")
return 2
db.session.delete(u)
try:
db.session.commit()
except (sqlalchemy.exc.IntegrityError, psycopg2.errors.ForeignKeyViolation):
sys.stderr.write(
f"""\nuser_delete: ne peux pas supprimer l'utilisateur {username}\ncar il est associé à des objets dans ScoDoc (modules, notes, ...).\n"""
)
return 1
click.echo(f"deleted user, login: {username}")
@app.cli.command()
@click.argument("username")
@click.password_option()
def user_password(username, password=None): # user-password
"Set (or change) user's password"
if not password:
sys.stderr.write("user_password: missing password")
return 1
u = User.query.filter_by(user_name=username).first()
if not u:
sys.stderr.write(f"user_password: user {username} does not exists\n")
return 1
u.set_password(password)
db.session.add(u)
db.session.commit()
click.echo(f"changed password for user {u}")
@app.cli.command()
@click.argument("rolename")
@click.argument("permissions", nargs=-1)
def create_role(rolename, permissions): # create-role
"""Create a new role"""
# Check rolename
if not re.match(r"^[a-zA-Z0-9]+$", rolename):
sys.stderr.write(f"create_role: invalid rolename {rolename}\n")
return 1
# Check permissions
permission_list = []
for permission_name in permissions:
perm = Permission.get_by_name(permission_name)
if not perm:
sys.stderr.write(f"create_role: invalid permission name {perm}\n")
sys.stderr.write(
f"\tavailable permissions: {', '.join([ name for name in Permission.permission_by_name])}.\n"
)
return 1
permission_list.append(perm)
role = Role.query.filter_by(name=rolename).first()
if role:
sys.stderr.write(f"create_role: role {rolename} already exists\n")
return 1
role = Role(name=rolename)
for perm in permission_list:
role.add_permission(perm)
db.session.add(role)
db.session.commit()
@app.cli.command()
def list_roles(): # list-roles
"""List all defined roles"""
for role in Role.query:
print(role)
@app.cli.command()
@click.argument("rolename")
@click.option("-a", "--add", "addpermissionname")
@click.option("-r", "--remove", "removepermissionname")
def edit_role(rolename, addpermissionname=None, removepermissionname=None): # edit-role
"""Add [-a] and/or remove [-r] a permission to/from a role.
In ScoDoc, permissions are not associated to users but to roles.
Each user has a set of roles in each departement.
Example: `flask edit-role -a ScoEditApo Ens`
"""
if addpermissionname:
perm_to_add = Permission.get_by_name(addpermissionname)
if not perm_to_add:
sys.stderr.write(
f"edit_role: permission {addpermissionname} does not exists\n"
)
return 1
else:
perm_to_add = None
if removepermissionname:
perm_to_remove = Permission.get_by_name(removepermissionname)
if not perm_to_remove:
sys.stderr.write(
f"edit_role: permission {removepermissionname} does not exists\n"
)
return 1
else:
perm_to_remove = None
role = Role.query.filter_by(name=rolename).first()
if not role:
sys.stderr.write(f"edit_role: role {rolename} does not exists\n")
return 1
if perm_to_add:
role.add_permission(perm_to_add)
click.echo(f"adding permission {addpermissionname} to role {rolename}")
if perm_to_remove:
role.remove_permission(perm_to_remove)
click.echo(f"removing permission {removepermissionname} from role {rolename}")
if perm_to_add or perm_to_remove:
db.session.add(role)
db.session.commit()
print(role)
@app.cli.command()
@click.argument("rolename")
def delete_role(rolename):
"""Delete a role"""
role = Role.query.filter_by(name=rolename).first()
if role is None:
sys.stderr.write(f"delete_role: role {rolename} does not exists\n")
return 1
db.session.delete(role)
db.session.commit()
@app.cli.command()
@click.argument("username")
@click.option("-d", "--dept", "dept_acronym")
@click.option("-a", "--add", "add_role_name")
@click.option("-r", "--remove", "remove_role_name")
def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name=None):
"""Add or remove a role to the given user in the given dept"""
user: User = User.query.filter_by(user_name=username).first()
if not user:
sys.stderr.write(f"user_role: user {username} does not exists\n")
return 1
# Sans argument, affiche les rôles de l'utilisateur
if dept_acronym is None and add_role_name is None and remove_role_name is None:
print(f"Roles for user {user.user_name}")
for user_role in sorted(
user.user_roles, key=lambda ur: (ur.dept or "", ur.role.name)
):
print(f"""{user_role.dept or "tous"}:\t{user_role.role.name}""")
if dept_acronym:
dept = models.Departement.query.filter_by(acronym=dept_acronym).first()
if dept is None:
sys.stderr.write(f"Erreur: le departement {dept_acronym} n'existe pas !\n")
return 2
if add_role_name:
role = Role.query.filter_by(name=add_role_name).first()
if role is None:
sys.stderr.write(
f"""user_role: role {add_role_name} does not exists
(use list-roles to display existing roles)\n"""
)
return 2
user.add_role(role, dept_acronym)
if remove_role_name:
role = Role.query.filter_by(name=remove_role_name).first()
if role is None:
sys.stderr.write(f"user_role: role {remove_role_name} does not exists\n")
return 2
user_role = UserRole.query.filter(
UserRole.role == role, UserRole.user == user, UserRole.dept == dept_acronym
).first()
if user_role:
db.session.delete(user_role)
db.session.commit()
def abort_if_false(ctx, param, value):
if not value:
ctx.abort()
@app.cli.command()
@click.option(
"-y",
"--yes",
is_flag=True,
callback=abort_if_false,
expose_value=False,
prompt="""Attention: Cela va effacer toutes les données du département
(étudiants, notes, formations, etc).
Voulez-vous vraiment continuer ?
""",
)
@click.option(
"-f",
"--force",
is_flag=True,
help="ignore non-existing departement",
)
@click.argument("dept")
def delete_dept(dept, force=False): # delete-dept
"""Delete existing departement"""
from app.scodoc import notesdb as ndb
from app.scodoc import sco_dept
msg = ""
db.reflect()
ndb.open_db_connection()
d = models.Departement.query.filter_by(acronym=dept).first()
if d is None and not force:
sys.stderr.write(f"Erreur: le departement {dept} n'existe pas !\n")
return 2
elif d:
msg = sco_dept.delete_dept(d.id)
db.session.commit()
if msg:
print(f"Erreur:\n {msg}")
return 0 if not msg else 1
@app.cli.command()
@click.argument("dept")
def create_dept(dept): # create-dept
"Create new departement"
_ = departements.create_dept(dept)
return 0
@app.cli.command()
@click.argument("depts", nargs=-1)
def list_depts(depts=""): # list-depts
"""If dept exists, print it, else nothing.
Called without arguments, list all depts along with their ids.
"""
for dept in models.Departement.query.order_by(models.Departement.id):
if not depts or dept.acronym in depts:
print(f"{dept.id}\t{dept.acronym}")
@app.cli.command()
@click.option(
"-n",
"--name",
is_flag=True,
help="show database name instead of connexion string (required for "
"dropdb/createdb commands)",
)
def scodoc_database(name): # scodoc-database
"""print the database connexion string"""
uri = app.config["SQLALCHEMY_DATABASE_URI"]
if name:
print(uri.split("/")[-1])
else:
print(uri)
@app.cli.command()
@with_appcontext
def import_scodoc7_users(): # import-scodoc7-users
"""Import users defined in ScoDoc7 postgresql database into ScoDoc 9
The old database SCOUSERS must be alive and readable by the current user.
This script is typically run as unix user "scodoc".
The original SCOUSERS database is left unmodified.
"""
messages = tools.import_scodoc7_user_db()
click.echo("----")
click.echo(f"import terminé: {len(messages)} warnings\n")
click.echo("\n".join(messages) + "\n")
@app.cli.command()
@click.argument("dept")
@click.argument("dept_db_name")
@with_appcontext
def import_scodoc7_dept(dept: str, dept_db_name: str = ""): # import-scodoc7-dept
"""Import département ScoDoc 7: dept: InfoComm, dept_db_name: SCOINFOCOMM"""
dept_db_uri = f"postgresql:///{dept_db_name}"
tools.import_scodoc7_dept(dept, dept_db_uri)
@app.cli.command()
@click.argument("dept", default="")
@with_appcontext
def migrate_scodoc7_dept_archives(dept: str): # migrate-scodoc7-dept-archives
"""Post-migration: renomme les archives en fonction des id de ScoDoc 9"""
tools.migrate_scodoc7_dept_archives(dept)
@app.cli.command()
@click.argument("dept", default="")
@with_appcontext
def migrate_scodoc7_dept_logos(dept: str = ""): # migrate-scodoc7-dept-logos
"""Post-migration: renomme les logos en fonction des id / dept de ScoDoc 9"""
tools.migrate_scodoc7_dept_logos(dept)
@app.cli.command()
@click.argument("logo", default=None)
@click.argument("dept", default=None)
@with_appcontext
def localize_logo(logo: str = None, dept: str = None): # migrate-scodoc7-dept-logos
"""Make local to a dept a global logo (both logo and dept names are mandatory)"""
if logo in ["header", "footer"]:
print(
f"Can't make logo '{logo}' local: add a local version throught configuration form instead"
)
return
make_logo_local(logoname=logo, dept_name=dept)
@app.cli.command()
@click.argument("formsemestre_id", type=click.INT)
@click.argument("xlsfile", type=click.File("rb"))
@click.argument("zipfile", type=click.File("rb"))
def photos_import_files(formsemestre_id: int, xlsfile: str, zipfile: str):
"""Import des photos d'étudiants à partir d'une liste excel et d'un zip avec les images."""
from app.scodoc import sco_trombino, sco_photos
from app.auth.models import get_super_admin
formsemestre = db.session.get(FormSemestre, formsemestre_id)
if not formsemestre:
sys.stderr.write("photos-import-files: formsemestre_id invalide\n")
return 2
with app.test_request_context():
mapp.set_sco_dept(formsemestre.departement.acronym)
admin_user = get_super_admin()
login_user(admin_user)
def callback(etud, data, filename):
return sco_photos.store_photo(etud, data, filename)
(
ignored_zipfiles,
unmatched_files,
stored_etud_filename,
) = sco_trombino.zip_excel_import_files(
xlsfile=xlsfile,
zipfile=zipfile,
callback=callback,
filename_title="fichier_photo",
)
print(
render_template(
"scolar/photos_import_files.txt",
ignored_zipfiles=ignored_zipfiles,
unmatched_files=unmatched_files,
stored_etud_filename=stored_etud_filename,
)
)
@app.cli.command()
@click.option("--sanitize/--no-sanitize", default=False)
@with_appcontext
def clear_cache(sanitize): # clear-cache
"""Clear ScoDoc cache
This cache (currently Redis) is persistent between invocation
and it may be necessary to clear it during upgrades,
development or tests.
"""
click.echo("Flushing Redis cache...")
clear_scodoc_cache()
if sanitize:
# sanitizes all formations:
click.echo("Checking formations...")
for formation in Formation.query:
formation.sanitize_old_formation()
@app.cli.command()
def init_test_database(): # init-test-database
"""Initialise les objets en base pour les tests API
(à appliquer sur SCODOC_TEST ou SCODOC_DEV)
"""
click.echo("Initialisation base de test API...")
ctx = app.test_request_context()
ctx.push()
admin = User.query.filter_by(user_name="admin").first()
login_user(admin)
create_test_api_database.init_test_database()
def recursive_help(cmd, parent=None):
ctx = click.core.Context(cmd, info_name=cmd.name, parent=parent)
print(cmd.get_help(ctx))
print()
commands = getattr(cmd, "commands", {})
for sub in commands.values():
recursive_help(sub, ctx)
@app.cli.command()
def entreprises_reset_db():
"""Remet a zéro les tables du module relations entreprises"""
click.confirm(
"This will erase all data from the blueprint 'entreprises'.\nAre you sure you want to continue?",
abort=True,
)
db.reflect()
try:
entreprises_reset_database()
except:
db.session.rollback()
raise
@app.cli.command()
def dumphelp():
"""Génère la page d'aide complète pour la doc."""
recursive_help(app.cli)
@app.cli.command()
@click.option("-h", "--host", default="127.0.0.1", help="The interface to bind to.")
@click.option("-p", "--port", default=5000, help="The port to bind to.")
@click.option(
"--length",
default=25,
help="Number of functions to include in the profiler report.",
)
@click.option(
"--profile-dir", default=None, help="Directory where profiler data files are saved."
)
def profile(host, port, length, profile_dir):
"""Start the application under the code profiler."""
from werkzeug.middleware.profiler import ProfilerMiddleware
from werkzeug.serving import run_simple
app.wsgi_app = ProfilerMiddleware(
app.wsgi_app, restrictions=[length], profile_dir=profile_dir
)
run_simple(
host, port, app, use_debugger=False
) # use run_simple instead of app.run()