1
0
forked from ScoDoc/ScoDoc
ScoDoc/scodoc.py

604 lines
19 KiB
Python
Raw Permalink Normal View History

2021-12-18 09:47:45 +01:00
# -*- coding: UTF-8 -*-
2021-05-29 18:22:51 +02:00
"""Application Flask: ScoDoc
"""
from pprint import pprint as pp
2021-09-13 23:06:42 +02:00
import re
import sys
2021-05-29 18:22:51 +02:00
import click
import flask
2021-06-24 10:59:03 +02:00
from flask.cli import with_appcontext
2021-10-20 16:47:41 +02:00
from flask.templating import render_template
from flask_login import login_user, logout_user, current_user
2022-02-13 15:19:39 +01:00
import psycopg2
import sqlalchemy
2021-09-13 16:11:33 +02:00
2021-08-10 12:57:38 +02:00
from app import create_app, cli, db
from app import initialize_scodoc_database
from app import clear_scodoc_cache
2021-09-13 16:11:33 +02:00
from app import models
2021-06-24 10:59:03 +02:00
2021-05-29 18:22:51 +02:00
from app.auth.models import User, Role, UserRole
from app.scodoc.sco_logos import make_logo_local
2022-08-02 16:21:41 +02:00
from app.models import departements
from app.models import Formation, UniteEns, Matiere, Module
from app.models import FormSemestre, FormSemestreInscription
2022-08-02 16:21:41 +02:00
from app.models import GroupDescr
from app.models import Identite
2022-08-02 16:21:41 +02:00
from app.models import ModuleImpl, ModuleImplInscription
from app.models import Partition
from app.models.evaluations import Evaluation
from app.entreprises.models import entreprises_reset_database
2021-09-13 16:11:33 +02:00
from app.scodoc.sco_permissions import Permission
from app.views import notes, scolar
import tools
from tools.fakedatabase import create_test_api_database
2021-05-29 18:22:51 +02:00
from config import RunningConfig
2021-05-29 18:22:51 +02:00
app = create_app(RunningConfig)
2021-05-29 18:22:51 +02:00
cli.register(app)
@app.shell_context_processor
def make_shell_context():
2021-12-13 19:05:51 +01:00
import numpy as np
import pandas as pd
import app as mapp # le package app
from app.scodoc import notesdb as ndb
from app.comp import res_sem
from app.scodoc import sco_utils as scu
2021-05-29 18:22:51 +02:00
return {
2021-12-13 19:05:51 +01:00
"ctx": app.test_request_context(),
2021-05-29 18:22:51 +02:00
"current_app": flask.current_app,
"current_user": current_user,
2021-12-13 19:05:51 +01:00
"db": db,
"Evaluation": Evaluation,
2021-12-13 19:05:51 +01:00
"flask": flask,
"Formation": Formation,
"FormSemestre": FormSemestre,
"FormSemestreInscription": FormSemestreInscription,
2022-08-02 16:21:41 +02:00
"GroupDescr": GroupDescr,
"Identite": Identite,
2021-12-13 19:05:51 +01:00
"login_user": login_user,
"logout_user": logout_user,
"mapp": mapp,
"models": models,
"Matiere": Matiere,
"Module": Module,
"ModuleImpl": ModuleImpl,
"ModuleImplInscription": ModuleImplInscription,
2022-08-02 16:21:41 +02:00
"Partition": Partition,
2021-12-13 19:05:51 +01:00
"ndb": ndb,
"notes": notes,
"np": np,
"pd": pd,
"Permission": Permission,
"pp": pp,
"Role": Role,
"res_sem": res_sem,
2021-12-13 19:05:51 +01:00
"scolar": scolar,
2022-04-12 17:12:51 +02:00
"ScolarNews": models.ScolarNews,
2021-12-13 19:05:51 +01:00
"scu": scu,
"UniteEns": UniteEns,
2021-12-13 19:05:51 +01:00
"User": User,
"UserRole": UserRole,
2021-05-29 18:22:51 +02:00
}
# ctx.push()
# admin = User.query.filter_by(user_name="admin").first()
# login_user(admin)
2021-05-29 18:22:51 +02:00
@app.cli.command()
@click.option("--erase/--no-erase", default=False)
def sco_db_init(erase=False): # sco-db-init
"""Initialize the database.
Starts from an existing database and create all
the necessary SQL tables and functions.
"""
2021-08-19 23:43:14 +02:00
if not app.config.get("SCODOC_ADMIN_MAIL"):
sys.stderr.write(
"""La variable SCODOC_ADMIN_MAIL n'est pas positionnée: vérifier votre .env"""
)
return 100
initialize_scodoc_database(erase=erase)
2021-05-29 18:22:51 +02:00
@app.cli.command()
def user_db_clear():
"""Erase all users and roles from the database !"""
click.echo("Erasing the users database !")
_clear_users_db()
2021-05-29 18:22:51 +02:00
def _clear_users_db():
2021-05-29 18:22:51 +02:00
"""Erase (drop) all tables of users database !"""
click.confirm(
"This will erase all users and roles.\nAre you sure you want to continue?",
abort=True,
)
2021-05-29 18:22:51 +02:00
db.reflect()
try:
db.session.query(UserRole).delete()
db.session.query(User).delete()
db.session.commit()
except:
db.session.rollback()
raise
@app.cli.command()
@click.argument("username")
@click.argument("role")
@click.argument("dept")
2021-08-08 09:50:10 +02:00
@click.option("-n", "--nom", "nom")
@click.option("-p", "--prenom", "prenom")
def user_create(username, role, dept, nom=None, prenom=None): # user-create
"Create a new user"
r = Role.get_named_role(role)
if not r:
2022-02-13 15:19:39 +01:00
sys.stderr.write(f"user_create: role {role} does not exist\n")
return 1
u = User.query.filter_by(user_name=username).first()
if u:
2022-02-13 15:19:39 +01:00
sys.stderr.write(f"user_create: user {u} already exists\n")
return 2
if dept == "@all":
dept = None
2021-08-08 09:50:10 +02:00
u = User(user_name=username, dept=dept, nom=nom, prenom=prenom)
u.add_role(r, dept)
db.session.add(u)
db.session.commit()
2022-02-13 15:19:39 +01:00
click.echo(f"created user, login: {u.user_name}, with role {r} in dept. {dept}")
@app.cli.command()
@click.argument("username")
def user_delete(username): # user-delete
"Try to delete this user. Fails if it's associated to some scodoc objects."
u = User.query.filter_by(user_name=username).first()
if not u:
sys.stderr.write(f"user_delete: user {username} not found\n")
return 2
db.session.delete(u)
try:
db.session.commit()
except (sqlalchemy.exc.IntegrityError, psycopg2.errors.ForeignKeyViolation):
sys.stderr.write(
f"""\nuser_delete: ne peux pas supprimer l'utilisateur {username}\ncar il est associé à des objets dans ScoDoc (modules, notes, ...).\n"""
)
2022-02-13 15:19:39 +01:00
return 1
click.echo(f"deleted user, login: {username}")
@app.cli.command()
@click.argument("username")
@click.password_option()
def user_password(username, password=None): # user-password
"Set (or change) user's password"
if not password:
sys.stderr.write("user_password: missing password")
return 1
u = User.query.filter_by(user_name=username).first()
if not u:
2021-09-13 16:11:33 +02:00
sys.stderr.write(f"user_password: user {username} does not exists\n")
return 1
u.set_password(password)
db.session.add(u)
db.session.commit()
2021-09-13 16:11:33 +02:00
click.echo(f"changed password for user {u}")
2021-09-13 23:06:42 +02:00
@app.cli.command()
@click.argument("rolename")
@click.argument("permissions", nargs=-1)
def create_role(rolename, permissions): # create-role
"""Create a new role"""
# Check rolename
if not re.match(r"^[a-zA-Z0-9]+$", rolename):
sys.stderr.write(f"create_role: invalid rolename {rolename}\n")
return 1
# Check permissions
permission_list = []
for permission_name in permissions:
perm = Permission.get_by_name(permission_name)
if not perm:
sys.stderr.write(f"create_role: invalid permission name {perm}\n")
sys.stderr.write(
f"\tavailable permissions: {', '.join([ name for name in Permission.permission_by_name])}.\n"
)
return 1
permission_list.append(perm)
role = Role.query.filter_by(name=rolename).first()
if role:
sys.stderr.write(f"create_role: role {rolename} already exists\n")
return 1
role = Role(name=rolename)
for perm in permission_list:
role.add_permission(perm)
db.session.add(role)
db.session.commit()
2022-07-24 07:14:31 +02:00
@app.cli.command()
def list_roles(): # list-roles
"""List all defined roles"""
for role in Role.query:
print(role)
2021-09-13 16:11:33 +02:00
@app.cli.command()
@click.argument("rolename")
@click.option("-a", "--add", "addpermissionname")
@click.option("-r", "--remove", "removepermissionname")
def edit_role(rolename, addpermissionname=None, removepermissionname=None): # edit-role
"""Add [-a] and/or remove [-r] a permission to/from a role.
In ScoDoc, permissions are not associated to users but to roles.
Each user has a set of roles in each departement.
Example: `flask edit-role -a ScoEditApo Ens`
"""
if addpermissionname:
2021-09-13 23:06:42 +02:00
perm_to_add = Permission.get_by_name(addpermissionname)
if not perm_to_add:
2021-09-13 16:11:33 +02:00
sys.stderr.write(
f"edit_role: permission {addpermissionname} does not exists\n"
)
return 1
else:
perm_to_add = None
if removepermissionname:
2021-09-13 23:06:42 +02:00
perm_to_remove = Permission.get_by_name(removepermissionname)
if not perm_to_remove:
2021-09-13 16:11:33 +02:00
sys.stderr.write(
f"edit_role: permission {removepermissionname} does not exists\n"
)
return 1
else:
perm_to_remove = None
role = Role.query.filter_by(name=rolename).first()
if not role:
sys.stderr.write(f"edit_role: role {rolename} does not exists\n")
return 1
if perm_to_add:
role.add_permission(perm_to_add)
click.echo(f"adding permission {addpermissionname} to role {rolename}")
if perm_to_remove:
role.remove_permission(perm_to_remove)
click.echo(f"removing permission {removepermissionname} from role {rolename}")
if perm_to_add or perm_to_remove:
db.session.add(role)
db.session.commit()
print(role)
2021-06-24 10:59:03 +02:00
2022-01-03 12:31:20 +01:00
2021-12-24 09:50:25 +01:00
@app.cli.command()
@click.argument("rolename")
def delete_role(rolename):
"""Delete a role"""
role = Role.query.filter_by(name=rolename).first()
if role is None:
sys.stderr.write(f"delete_role: role {rolename} does not exists\n")
return 1
db.session.delete(role)
db.session.commit()
2021-06-24 10:59:03 +02:00
2022-01-03 12:31:20 +01:00
2021-12-21 00:04:42 +01:00
@app.cli.command()
@click.argument("username")
@click.option("-d", "--dept", "dept_acronym")
@click.option("-a", "--add", "add_role_name")
@click.option("-r", "--remove", "remove_role_name")
def user_role(username, dept_acronym=None, add_role_name=None, remove_role_name=None):
"""Add or remove a role to the given user in the given dept"""
2022-07-24 07:14:31 +02:00
user: User = User.query.filter_by(user_name=username).first()
2021-12-21 00:04:42 +01:00
if not user:
sys.stderr.write(f"user_role: user {username} does not exists\n")
return 1
# Sans argument, affiche les rôles de l'utilisateur
if dept_acronym is None and add_role_name is None and remove_role_name is None:
print(f"Roles for user {user.user_name}")
for user_role in sorted(
user.user_roles, key=lambda ur: (ur.dept or "", ur.role.name)
):
print(f"""{user_role.dept or "tous"}:\t{user_role.role.name}""")
2021-12-21 00:04:42 +01:00
if dept_acronym:
dept = models.Departement.query.filter_by(acronym=dept_acronym).first()
if dept is None:
sys.stderr.write(f"Erreur: le departement {dept_acronym} n'existe pas !\n")
2021-12-21 00:04:42 +01:00
return 2
if add_role_name:
role = Role.query.filter_by(name=add_role_name).first()
2022-07-24 07:14:31 +02:00
if role is None:
sys.stderr.write(
f"""user_role: role {add_role_name} does not exists
(use list-roles to display existing roles)\n"""
)
2022-07-24 07:14:31 +02:00
return 2
2021-12-21 00:04:42 +01:00
user.add_role(role, dept_acronym)
if remove_role_name:
role = Role.query.filter_by(name=remove_role_name).first()
2022-07-24 07:14:31 +02:00
if role is None:
sys.stderr.write(f"user_role: role {remove_role_name} does not exists\n")
return 2
2021-12-21 00:04:42 +01:00
user_role = UserRole.query.filter(
UserRole.role == role, UserRole.user == user, UserRole.dept == dept_acronym
).first()
2022-08-06 22:31:41 +02:00
if user_role:
db.session.delete(user_role)
2021-12-21 00:04:42 +01:00
db.session.commit()
def abort_if_false(ctx, param, value):
if not value:
ctx.abort()
2021-06-24 10:59:03 +02:00
@app.cli.command()
@click.option(
"-y",
"--yes",
is_flag=True,
callback=abort_if_false,
expose_value=False,
2022-08-06 22:31:41 +02:00
prompt="""Attention: Cela va effacer toutes les données du département
(étudiants, notes, formations, etc).
Voulez-vous vraiment continuer ?
""",
)
@click.option(
"-f",
"--force",
is_flag=True,
help="ignore non-existing departement",
)
2021-06-24 10:59:03 +02:00
@click.argument("dept")
def delete_dept(dept, force=False): # delete-dept
2021-08-13 00:34:58 +02:00
"""Delete existing departement"""
from app.scodoc import notesdb as ndb
from app.scodoc import sco_dept
2021-08-13 00:34:58 +02:00
db.reflect()
ndb.open_db_connection()
2021-08-13 00:34:58 +02:00
d = models.Departement.query.filter_by(acronym=dept).first()
if d is None and not force:
sys.stderr.write(f"Erreur: le departement {dept} n'existe pas !\n")
2021-08-13 00:34:58 +02:00
return 2
elif d:
sco_dept.delete_dept(d.id)
db.session.commit()
2021-06-24 10:59:03 +02:00
return 0
@app.cli.command()
@click.argument("dept")
def create_dept(dept): # create-dept
2021-06-24 10:59:03 +02:00
"Create new departement"
_ = departements.create_dept(dept)
2021-06-24 10:59:03 +02:00
return 0
@app.cli.command()
@click.argument("depts", nargs=-1)
2022-07-24 07:14:31 +02:00
def list_depts(depts=""): # list-depts
"""If dept exists, print it, else nothing.
Called without arguments, list all depts along with their ids.
"""
for dept in models.Departement.query.order_by(models.Departement.id):
if not depts or dept.acronym in depts:
print(f"{dept.id}\t{dept.acronym}")
2021-10-06 13:53:09 +02:00
@app.cli.command()
@click.option(
"-n",
"--name",
is_flag=True,
help="show database name instead of connexion string (required for "
2021-11-17 10:28:51 +01:00
"dropdb/createdb commands)",
2021-10-06 13:53:09 +02:00
)
def scodoc_database(name): # scodoc-database
2021-10-06 13:53:09 +02:00
"""print the database connexion string"""
uri = app.config["SQLALCHEMY_DATABASE_URI"]
if name:
print(uri.split("/")[-1])
else:
print(uri)
2021-07-05 00:07:17 +02:00
@app.cli.command()
@with_appcontext
def import_scodoc7_users(): # import-scodoc7-users
2021-09-13 23:06:42 +02:00
"""Import users defined in ScoDoc7 postgresql database into ScoDoc 9
2021-07-05 00:07:17 +02:00
The old database SCOUSERS must be alive and readable by the current user.
2021-07-27 17:07:03 +03:00
This script is typically run as unix user "scodoc".
The original SCOUSERS database is left unmodified.
2021-07-05 00:07:17 +02:00
"""
messages = tools.import_scodoc7_user_db()
click.echo("----")
click.echo(f"import terminé: {len(messages)} warnings\n")
click.echo("\n".join(messages) + "\n")
@app.cli.command()
@click.argument("dept")
@click.argument("dept_db_name")
@with_appcontext
2021-09-16 21:42:45 +02:00
def import_scodoc7_dept(dept: str, dept_db_name: str = ""): # import-scodoc7-dept
"""Import département ScoDoc 7: dept: InfoComm, dept_db_name: SCOINFOCOMM"""
dept_db_uri = f"postgresql:///{dept_db_name}"
tools.import_scodoc7_dept(dept, dept_db_uri)
2021-07-29 11:30:13 +03:00
2021-09-16 21:42:45 +02:00
@app.cli.command()
@click.argument("dept", default="")
@with_appcontext
def migrate_scodoc7_dept_archives(dept: str): # migrate-scodoc7-dept-archives
2021-09-16 21:42:45 +02:00
"""Post-migration: renomme les archives en fonction des id de ScoDoc 9"""
tools.migrate_scodoc7_dept_archives(dept)
2021-09-16 21:42:45 +02:00
@app.cli.command()
@click.argument("dept", default="")
@with_appcontext
def migrate_scodoc7_dept_logos(dept: str = ""): # migrate-scodoc7-dept-logos
"""Post-migration: renomme les logos en fonction des id / dept de ScoDoc 9"""
tools.migrate_scodoc7_dept_logos(dept)
2021-11-14 10:43:55 +01:00
@app.cli.command()
@click.argument("logo", default=None)
@click.argument("dept", default=None)
@with_appcontext
def localize_logo(logo: str = None, dept: str = None): # migrate-scodoc7-dept-logos
"""Make local to a dept a global logo (both logo and dept names are mandatory)"""
if logo in ["header", "footer"]:
print(
f"Can't make logo '{logo}' local: add a local version throught configuration form instead"
)
return
make_logo_local(logoname=logo, dept_name=dept)
2021-10-20 16:47:41 +02:00
@app.cli.command()
@click.argument("formsemestre_id", type=click.INT)
@click.argument("xlsfile", type=click.File("rb"))
@click.argument("zipfile", type=click.File("rb"))
def photos_import_files(formsemestre_id: int, xlsfile: str, zipfile: str):
2022-02-13 15:50:16 +01:00
"""Import des photos d'étudiants à partir d'une liste excel et d'un zip avec les images."""
2021-10-20 16:47:41 +02:00
import app as mapp
from app.scodoc import sco_trombino, sco_photos
from flask_login import login_user
from app.auth.models import get_super_admin
sem = mapp.models.formsemestre.FormSemestre.query.get(formsemestre_id)
if not sem:
sys.stderr.write("photos-import-files: numéro de semestre invalide\n")
return 2
with app.test_request_context():
mapp.set_sco_dept(sem.departement.acronym)
admin_user = get_super_admin()
login_user(admin_user)
def callback(etud, data, filename):
return sco_photos.store_photo(etud, data, filename)
2021-10-20 16:47:41 +02:00
(
ignored_zipfiles,
unmatched_files,
stored_etud_filename,
) = sco_trombino.zip_excel_import_files(
xlsfile=xlsfile,
zipfile=zipfile,
callback=callback,
filename_title="fichier_photo",
)
print(
render_template(
"scolar/photos_import_files.txt",
ignored_zipfiles=ignored_zipfiles,
unmatched_files=unmatched_files,
stored_etud_filename=stored_etud_filename,
)
)
2021-07-29 11:30:13 +03:00
@app.cli.command()
@click.option("--sanitize/--no-sanitize", default=False)
2021-07-29 11:30:13 +03:00
@with_appcontext
def clear_cache(sanitize): # clear-cache
2021-07-29 11:30:13 +03:00
"""Clear ScoDoc cache
This cache (currently Redis) is persistent between invocation
and it may be necessary to clear it during upgrades,
development or tests.
2021-07-29 11:30:13 +03:00
"""
click.echo("Flushing Redis cache...")
2021-08-10 12:57:38 +02:00
clear_scodoc_cache()
if sanitize:
# sanitizes all formations:
click.echo("Checking formations...")
for formation in Formation.query:
formation.sanitize_old_formation()
2021-09-13 23:06:42 +02:00
@app.cli.command()
def init_test_database(): # init-test-database
"""Initialise les objets en base pour les tests API
(à appliquer sur SCODOC_TEST ou SCODOC_DEV)
"""
click.echo("Initialisation base de test API...")
ctx = app.test_request_context()
ctx.push()
admin = User.query.filter_by(user_name="admin").first()
login_user(admin)
create_test_api_database.init_test_database()
2021-09-13 23:06:42 +02:00
def recursive_help(cmd, parent=None):
ctx = click.core.Context(cmd, info_name=cmd.name, parent=parent)
print(cmd.get_help(ctx))
print()
commands = getattr(cmd, "commands", {})
for sub in commands.values():
recursive_help(sub, ctx)
@app.cli.command()
def entreprises_reset_db():
"""Remet a zéro les tables du module relations entreprises"""
click.confirm(
2022-07-13 16:53:54 +02:00
"This will erase all data from the blueprint 'entreprises'.\nAre you sure you want to continue?",
abort=True,
)
db.reflect()
try:
entreprises_reset_database()
except:
db.session.rollback()
raise
2021-09-13 23:06:42 +02:00
@app.cli.command()
def dumphelp():
2022-02-13 15:19:39 +01:00
"""Génère la page d'aide complète pour la doc."""
2021-09-13 23:06:42 +02:00
recursive_help(app.cli)
2021-10-04 21:57:35 +02:00
@app.cli.command()
@click.option("-h", "--host", default="127.0.0.1", help="The interface to bind to.")
@click.option("-p", "--port", default=5000, help="The port to bind to.")
@click.option(
"--length",
default=25,
help="Number of functions to include in the profiler report.",
)
@click.option(
"--profile-dir", default=None, help="Directory where profiler data files are saved."
)
def profile(host, port, length, profile_dir):
"""Start the application under the code profiler."""
from werkzeug.middleware.profiler import ProfilerMiddleware
from werkzeug.serving import run_simple
app.wsgi_app = ProfilerMiddleware(
app.wsgi_app, restrictions=[length], profile_dir=profile_dir
)
run_simple(
host, port, app, use_debugger=False
) # use run_simple instead of app.run()