592 lines
18 KiB
Python
Executable File
592 lines
18 KiB
Python
Executable File
# -*- coding: UTF-8 -*-
|
|
|
|
|
|
"""Application Flask: ScoDoc
|
|
|
|
|
|
"""
|
|
|
|
from pprint import pprint as pp
|
|
import re
|
|
import sys
|
|
|
|
import click
|
|
import flask
|
|
from flask.cli import with_appcontext
|
|
from flask.templating import render_template
|
|
from flask_login import login_user, logout_user, current_user
|
|
import psycopg2
|
|
import sqlalchemy
|
|
|
|
from app import create_app, cli, db
|
|
from app import initialize_scodoc_database
|
|
from app import clear_scodoc_cache
|
|
from app import models
|
|
|
|
from app.auth.models import User, Role, UserRole
|
|
from app.scodoc.sco_logos import make_logo_local
|
|
from app.models import Formation, UniteEns, Matiere, Module
|
|
from app.models import FormSemestre, FormSemestreInscription
|
|
from app.models import ModuleImpl, ModuleImplInscription
|
|
from app.models import Identite
|
|
from app.models import departements
|
|
from app.models.evaluations import Evaluation
|
|
from app.entreprises.models import entreprises_reset_database
|
|
from app.scodoc.sco_permissions import Permission
|
|
from app.views import notes, scolar
|
|
import tools
|
|
from tools.fakedatabase import create_test_api_database
|
|
|
|
from config import RunningConfig
|
|
|
|
app = create_app(RunningConfig)
|
|
cli.register(app)
|
|
|
|
|
|
@app.shell_context_processor
|
|
def make_shell_context():
|
|
import numpy as np
|
|
import pandas as pd
|
|
import app as mapp # le package app
|
|
from app.scodoc import notesdb as ndb
|
|
from app.comp import res_sem
|
|
from app.scodoc import sco_utils as scu
|
|
|
|
return {
|
|
"ctx": app.test_request_context(),
|
|
"current_app": flask.current_app,
|
|
"current_user": current_user,
|
|
"db": db,
|
|
"Evaluation": Evaluation,
|
|
"flask": flask,
|
|
"Formation": Formation,
|
|
"FormSemestre": FormSemestre,
|
|
"FormSemestreInscription": FormSemestreInscription,
|
|
"Identite": Identite,
|
|
"login_user": login_user,
|
|
"logout_user": logout_user,
|
|
"mapp": mapp,
|
|
"models": models,
|
|
"Matiere": Matiere,
|
|
"Module": Module,
|
|
"ModuleImpl": ModuleImpl,
|
|
"ModuleImplInscription": ModuleImplInscription,
|
|
"ndb": ndb,
|
|
"notes": notes,
|
|
"np": np,
|
|
"pd": pd,
|
|
"Permission": Permission,
|
|
"pp": pp,
|
|
"Role": Role,
|
|
"res_sem": res_sem,
|
|
"scolar": scolar,
|
|
"ScolarNews": models.ScolarNews,
|
|
"scu": scu,
|
|
"UniteEns": UniteEns,
|
|
"User": User,
|
|
"UserRole": UserRole,
|
|
}
|
|
|
|
|
|
# ctx.push()
|
|
# admin = User.query.filter_by(user_name="admin").first()
|
|
# login_user(admin)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.option("--erase/--no-erase", default=False)
|
|
def sco_db_init(erase=False): # sco-db-init
|
|
"""Initialize the database.
|
|
Starts from an existing database and create all
|
|
the necessary SQL tables and functions.
|
|
"""
|
|
if not app.config.get("SCODOC_ADMIN_MAIL"):
|
|
sys.stderr.write(
|
|
"""La variable SCODOC_ADMIN_MAIL n'est pas positionnée: vérifier votre .env"""
|
|
)
|
|
return 100
|
|
initialize_scodoc_database(erase=erase)
|
|
|
|
|
|
@app.cli.command()
|
|
def user_db_clear():
|
|
"""Erase all users and roles from the database !"""
|
|
click.echo("Erasing the users database !")
|
|
_clear_users_db()
|
|
|
|
|
|
def _clear_users_db():
|
|
"""Erase (drop) all tables of users database !"""
|
|
click.confirm(
|
|
"This will erase all users and roles.\nAre you sure you want to continue?",
|
|
abort=True,
|
|
)
|
|
db.reflect()
|
|
try:
|
|
db.session.query(UserRole).delete()
|
|
db.session.query(User).delete()
|
|
db.session.commit()
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("username")
|
|
@click.argument("role")
|
|
@click.argument("dept")
|
|
@click.option("-n", "--nom", "nom")
|
|
@click.option("-p", "--prenom", "prenom")
|
|
def user_create(username, role, dept, nom=None, prenom=None): # user-create
|
|
"Create a new user"
|
|
r = Role.get_named_role(role)
|
|
if not r:
|
|
sys.stderr.write(f"user_create: role {role} does not exist\n")
|
|
return 1
|
|
u = User.query.filter_by(user_name=username).first()
|
|
if u:
|
|
sys.stderr.write(f"user_create: user {u} already exists\n")
|
|
return 2
|
|
if dept == "@all":
|
|
dept = None
|
|
u = User(user_name=username, dept=dept, nom=nom, prenom=prenom)
|
|
u.add_role(r, dept)
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
click.echo(f"created user, login: {u.user_name}, with role {r} in dept. {dept}")
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("username")
|
|
def user_delete(username): # user-delete
|
|
"Try to delete this user. Fails if it's associated to some scodoc objects."
|
|
u = User.query.filter_by(user_name=username).first()
|
|
if not u:
|
|
sys.stderr.write(f"user_delete: user {username} not found\n")
|
|
return 2
|
|
db.session.delete(u)
|
|
try:
|
|
db.session.commit()
|
|
except (sqlalchemy.exc.IntegrityError, psycopg2.errors.ForeignKeyViolation):
|
|
sys.stderr.write(
|
|
f"""\nuser_delete: ne peux pas supprimer l'utilisateur {username}\ncar il est associé à des objets dans ScoDoc (modules, notes, ...).\n"""
|
|
)
|
|
return 1
|
|
click.echo(f"deleted user, login: {username}")
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("username")
|
|
@click.password_option()
|
|
def user_password(username, password=None): # user-password
|
|
"Set (or change) user's password"
|
|
if not password:
|
|
sys.stderr.write("user_password: missing password")
|
|
return 1
|
|
u = User.query.filter_by(user_name=username).first()
|
|
if not u:
|
|
sys.stderr.write(f"user_password: user {username} does not exists\n")
|
|
return 1
|
|
|
|
u.set_password(password)
|
|
db.session.add(u)
|
|
db.session.commit()
|
|
click.echo(f"changed password for user {u}")
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("rolename")
|
|
@click.argument("permissions", nargs=-1)
|
|
def create_role(rolename, permissions): # create-role
|
|
"""Create a new role"""
|
|
# Check rolename
|
|
if not re.match(r"^[a-zA-Z0-9]+$", rolename):
|
|
sys.stderr.write(f"create_role: invalid rolename {rolename}\n")
|
|
return 1
|
|
# Check permissions
|
|
permission_list = []
|
|
for permission_name in permissions:
|
|
perm = Permission.get_by_name(permission_name)
|
|
if not perm:
|
|
sys.stderr.write(f"create_role: invalid permission name {perm}\n")
|
|
sys.stderr.write(
|
|
f"\tavailable permissions: {', '.join([ name for name in Permission.permission_by_name])}.\n"
|
|
)
|
|
return 1
|
|
permission_list.append(perm)
|
|
|
|
role = Role.query.filter_by(name=rolename).first()
|
|
if role:
|
|
sys.stderr.write(f"create_role: role {rolename} already exists\n")
|
|
return 1
|
|
|
|
role = Role(name=rolename)
|
|
for perm in permission_list:
|
|
role.add_permission(perm)
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
|
|
|
|
@app.cli.command()
|
|
def list_roles(): # list-roles
|
|
"""List all defined roles"""
|
|
for role in Role.query:
|
|
print(role)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("rolename")
|
|
@click.option("-a", "--add", "addpermissionname")
|
|
@click.option("-r", "--remove", "removepermissionname")
|
|
def edit_role(rolename, addpermissionname=None, removepermissionname=None): # edit-role
|
|
"""Add [-a] and/or remove [-r] a permission to/from a role.
|
|
In ScoDoc, permissions are not associated to users but to roles.
|
|
Each user has a set of roles in each departement.
|
|
|
|
Example: `flask edit-role -a ScoEditApo Ens`
|
|
"""
|
|
if addpermissionname:
|
|
perm_to_add = Permission.get_by_name(addpermissionname)
|
|
if not perm_to_add:
|
|
sys.stderr.write(
|
|
f"edit_role: permission {addpermissionname} does not exists\n"
|
|
)
|
|
return 1
|
|
else:
|
|
perm_to_add = None
|
|
if removepermissionname:
|
|
perm_to_remove = Permission.get_by_name(removepermissionname)
|
|
if not perm_to_remove:
|
|
sys.stderr.write(
|
|
f"edit_role: permission {removepermissionname} does not exists\n"
|
|
)
|
|
return 1
|
|
else:
|
|
perm_to_remove = None
|
|
role = Role.query.filter_by(name=rolename).first()
|
|
if not role:
|
|
sys.stderr.write(f"edit_role: role {rolename} does not exists\n")
|
|
return 1
|
|
if perm_to_add:
|
|
role.add_permission(perm_to_add)
|
|
click.echo(f"adding permission {addpermissionname} to role {rolename}")
|
|
if perm_to_remove:
|
|
role.remove_permission(perm_to_remove)
|
|
click.echo(f"removing permission {removepermissionname} from role {rolename}")
|
|
if perm_to_add or perm_to_remove:
|
|
db.session.add(role)
|
|
db.session.commit()
|
|
print(role)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("rolename")
|
|
def delete_role(rolename):
|
|
"""Delete a role"""
|
|
role = Role.query.filter_by(name=rolename).first()
|
|
if role is None:
|
|
sys.stderr.write(f"delete_role: role {rolename} does not exists\n")
|
|
return 1
|
|
db.session.delete(role)
|
|
db.session.commit()
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("username")
|
|
@click.option("-d", "--dept", "dept_acronym")
|
|
@click.option("-a", "--add", "add_role_name")
|
|
@click.option("-r", "--remove", "remove_role_name")
|
|
def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name=None):
|
|
"""Add or remove a role to the given user in the given dept"""
|
|
user: User = User.query.filter_by(user_name=username).first()
|
|
if not user:
|
|
sys.stderr.write(f"user_role: user {username} does not exists\n")
|
|
return 1
|
|
# Sans argument, affiche les rôles de l'utilisateur
|
|
if dept_acronym is None and add_role_name is None and remove_role_name is None:
|
|
print(f"Roles for user {user.user_name}")
|
|
for user_role in sorted(
|
|
user.user_roles, key=lambda ur: (ur.dept or "", ur.role.name)
|
|
):
|
|
print(f"""{user_role.dept or "tous"}:\t{user_role.role.name}""")
|
|
|
|
if dept_acronym:
|
|
dept = models.Departement.query.filter_by(acronym=dept_acronym).first()
|
|
if dept is None:
|
|
sys.stderr.write(f"Erreur: le departement {dept_acronym} n'existe pas !\n")
|
|
return 2
|
|
|
|
if add_role_name:
|
|
role = Role.query.filter_by(name=add_role_name).first()
|
|
if role is None:
|
|
sys.stderr.write(
|
|
f"""user_role: role {add_role_name} does not exists
|
|
(use list-roles to display existing roles)\n"""
|
|
)
|
|
return 2
|
|
user.add_role(role, dept_acronym)
|
|
if remove_role_name:
|
|
role = Role.query.filter_by(name=remove_role_name).first()
|
|
if role is None:
|
|
sys.stderr.write(f"user_role: role {remove_role_name} does not exists\n")
|
|
return 2
|
|
user_role = UserRole.query.filter(
|
|
UserRole.role == role, UserRole.user == user, UserRole.dept == dept_acronym
|
|
).first()
|
|
db.session.delete(user_role)
|
|
db.session.commit()
|
|
|
|
|
|
def abort_if_false(ctx, param, value):
|
|
if not value:
|
|
ctx.abort()
|
|
|
|
|
|
@app.cli.command()
|
|
@click.option(
|
|
"--yes",
|
|
is_flag=True,
|
|
callback=abort_if_false,
|
|
expose_value=False,
|
|
prompt=f"""Attention: Cela va effacer toutes les données du département
|
|
(étudiants, notes, formations, etc)
|
|
Voulez-vous vraiment continuer ?
|
|
""",
|
|
)
|
|
@click.argument("dept")
|
|
def delete_dept(dept): # delete-dept
|
|
"""Delete existing departement"""
|
|
from app.scodoc import notesdb as ndb
|
|
from app.scodoc import sco_dept
|
|
|
|
db.reflect()
|
|
ndb.open_db_connection()
|
|
d = models.Departement.query.filter_by(acronym=dept).first()
|
|
if d is None:
|
|
sys.stderr.write(f"Erreur: le departement {dept} n'existe pas !\n")
|
|
return 2
|
|
sco_dept.delete_dept(d.id)
|
|
db.session.commit()
|
|
return 0
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("dept")
|
|
def create_dept(dept): # create-dept
|
|
"Create new departement"
|
|
_ = departements.create_dept(dept)
|
|
return 0
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("depts", nargs=-1)
|
|
def list_depts(depts=""): # list-depts
|
|
"""If dept exists, print it, else nothing.
|
|
Called without arguments, list all depts along with their ids.
|
|
"""
|
|
for dept in models.Departement.query.order_by(models.Departement.id):
|
|
if not depts or dept.acronym in depts:
|
|
print(f"{dept.id}\t{dept.acronym}")
|
|
|
|
|
|
@app.cli.command()
|
|
@click.option(
|
|
"-n",
|
|
"--name",
|
|
is_flag=True,
|
|
help="show database name instead of connexion string (required for "
|
|
"dropdb/createdb commands)",
|
|
)
|
|
def scodoc_database(name): # list-dept
|
|
"""print the database connexion string"""
|
|
uri = app.config["SQLALCHEMY_DATABASE_URI"]
|
|
if name:
|
|
print(uri.split("/")[-1])
|
|
else:
|
|
print(uri)
|
|
|
|
|
|
@app.cli.command()
|
|
@with_appcontext
|
|
def import_scodoc7_users(): # import-scodoc7-users
|
|
"""Import users defined in ScoDoc7 postgresql database into ScoDoc 9
|
|
The old database SCOUSERS must be alive and readable by the current user.
|
|
This script is typically run as unix user "scodoc".
|
|
The original SCOUSERS database is left unmodified.
|
|
"""
|
|
messages = tools.import_scodoc7_user_db()
|
|
click.echo("----")
|
|
click.echo(f"import terminé: {len(messages)} warnings\n")
|
|
click.echo("\n".join(messages) + "\n")
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("dept")
|
|
@click.argument("dept_db_name")
|
|
@with_appcontext
|
|
def import_scodoc7_dept(dept: str, dept_db_name: str = ""): # import-scodoc7-dept
|
|
"""Import département ScoDoc 7: dept: InfoComm, dept_db_name: SCOINFOCOMM"""
|
|
dept_db_uri = f"postgresql:///{dept_db_name}"
|
|
tools.import_scodoc7_dept(dept, dept_db_uri)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("dept", default="")
|
|
@with_appcontext
|
|
def migrate_scodoc7_dept_archives(dept: str): # migrate-scodoc7-dept-archives
|
|
"""Post-migration: renomme les archives en fonction des id de ScoDoc 9"""
|
|
tools.migrate_scodoc7_dept_archives(dept)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("dept", default="")
|
|
@with_appcontext
|
|
def migrate_scodoc7_dept_logos(dept: str = ""): # migrate-scodoc7-dept-logos
|
|
"""Post-migration: renomme les logos en fonction des id / dept de ScoDoc 9"""
|
|
tools.migrate_scodoc7_dept_logos(dept)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("logo", default=None)
|
|
@click.argument("dept", default=None)
|
|
@with_appcontext
|
|
def localize_logo(logo: str = None, dept: str = None): # migrate-scodoc7-dept-logos
|
|
"""Make local to a dept a global logo (both logo and dept names are mandatory)"""
|
|
if logo in ["header", "footer"]:
|
|
print(
|
|
f"Can't make logo '{logo}' local: add a local version throught configuration form instead"
|
|
)
|
|
return
|
|
make_logo_local(logoname=logo, dept_name=dept)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.argument("formsemestre_id", type=click.INT)
|
|
@click.argument("xlsfile", type=click.File("rb"))
|
|
@click.argument("zipfile", type=click.File("rb"))
|
|
def photos_import_files(formsemestre_id: int, xlsfile: str, zipfile: str):
|
|
"""Import des photos d'étudiants à partir d'une liste excel et d'un zip avec les images."""
|
|
import app as mapp
|
|
from app.scodoc import sco_trombino, sco_photos
|
|
from app.scodoc import notesdb as ndb
|
|
from flask_login import login_user
|
|
from app.auth.models import get_super_admin
|
|
|
|
sem = mapp.models.formsemestre.FormSemestre.query.get(formsemestre_id)
|
|
if not sem:
|
|
sys.stderr.write("photos-import-files: numéro de semestre invalide\n")
|
|
return 2
|
|
|
|
with app.test_request_context():
|
|
mapp.set_sco_dept(sem.departement.acronym)
|
|
admin_user = get_super_admin()
|
|
login_user(admin_user)
|
|
|
|
def callback(etud, data, filename):
|
|
sco_photos.store_photo(etud, data)
|
|
|
|
(
|
|
ignored_zipfiles,
|
|
unmatched_files,
|
|
stored_etud_filename,
|
|
) = sco_trombino.zip_excel_import_files(
|
|
xlsfile=xlsfile,
|
|
zipfile=zipfile,
|
|
callback=callback,
|
|
filename_title="fichier_photo",
|
|
)
|
|
print(
|
|
render_template(
|
|
"scolar/photos_import_files.txt",
|
|
ignored_zipfiles=ignored_zipfiles,
|
|
unmatched_files=unmatched_files,
|
|
stored_etud_filename=stored_etud_filename,
|
|
)
|
|
)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.option("--sanitize/--no-sanitize", default=False)
|
|
@with_appcontext
|
|
def clear_cache(sanitize): # clear-cache
|
|
"""Clear ScoDoc cache
|
|
This cache (currently Redis) is persistent between invocation
|
|
and it may be necessary to clear it during upgrades,
|
|
development or tests.
|
|
"""
|
|
click.echo("Flushing Redis cache...")
|
|
clear_scodoc_cache()
|
|
if sanitize:
|
|
# sanitizes all formations:
|
|
click.echo("Checking formations...")
|
|
for formation in Formation.query:
|
|
formation.sanitize_old_formation()
|
|
|
|
|
|
@app.cli.command()
|
|
def init_test_database(): # init-test-database
|
|
"""Initialise les objets en base pour les tests API
|
|
(à appliquer sur SCODOC_TEST ou SCODOC_DEV)
|
|
"""
|
|
click.echo("Initialisation base de test API...")
|
|
|
|
ctx = app.test_request_context()
|
|
ctx.push()
|
|
admin = User.query.filter_by(user_name="admin").first()
|
|
login_user(admin)
|
|
create_test_api_database.init_test_database()
|
|
|
|
|
|
def recursive_help(cmd, parent=None):
|
|
ctx = click.core.Context(cmd, info_name=cmd.name, parent=parent)
|
|
print(cmd.get_help(ctx))
|
|
print()
|
|
commands = getattr(cmd, "commands", {})
|
|
for sub in commands.values():
|
|
recursive_help(sub, ctx)
|
|
|
|
|
|
@app.cli.command()
|
|
def entreprises_reset_db():
|
|
"""Remet a zéro les tables du module relations entreprises"""
|
|
click.confirm(
|
|
"This will erase all data from the blueprint 'entreprises'.\nAre you sure you want to continue?",
|
|
abort=True,
|
|
)
|
|
db.reflect()
|
|
try:
|
|
entreprises_reset_database()
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
|
|
|
|
@app.cli.command()
|
|
def dumphelp():
|
|
"""Génère la page d'aide complète pour la doc."""
|
|
recursive_help(app.cli)
|
|
|
|
|
|
@app.cli.command()
|
|
@click.option("-h", "--host", default="127.0.0.1", help="The interface to bind to.")
|
|
@click.option("-p", "--port", default=5000, help="The port to bind to.")
|
|
@click.option(
|
|
"--length",
|
|
default=25,
|
|
help="Number of functions to include in the profiler report.",
|
|
)
|
|
@click.option(
|
|
"--profile-dir", default=None, help="Directory where profiler data files are saved."
|
|
)
|
|
def profile(host, port, length, profile_dir):
|
|
"""Start the application under the code profiler."""
|
|
from werkzeug.middleware.profiler import ProfilerMiddleware
|
|
from werkzeug.serving import run_simple
|
|
|
|
app.wsgi_app = ProfilerMiddleware(
|
|
app.wsgi_app, restrictions=[length], profile_dir=profile_dir
|
|
)
|
|
run_simple(
|
|
host, port, app, use_debugger=False
|
|
) # use run_simple instead of app.run()
|