ScoDoc/app/__init__.py

687 lines
23 KiB
Python
Executable File

# -*- coding: UTF-8 -*
# pylint: disable=invalid-name
import base64
import datetime
import json
import os
import socket
import sys
import time
import traceback
import logging
from logging.handlers import SMTPHandler, WatchedFileHandler
from threading import Thread
import warnings
from celery import Celery, Task
from flask import current_app, g, request
from flask import Flask
from flask import abort, flash, has_request_context
from flask import render_template
from flask.logging import default_handler
from flask_caching import Cache
from flask_json import FlaskJSON, json_response
from flask_login import LoginManager, current_user
from flask_mail import Mail
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy
from jinja2 import select_autoescape
import numpy as np
import psycopg2
from psycopg2.extensions import AsIs as psycopg2_AsIs
import sqlalchemy as sa
import werkzeug.debug
from wtforms.fields import HiddenField
from flask_cas import CAS
from app.scodoc.sco_exceptions import (
AccessDenied,
ScoBugCatcher,
ScoException,
ScoGenError,
ScoInvalidCSRF,
ScoPDFFormatError,
ScoValueError,
APIInvalidParams,
)
from app.scodoc.sco_vdi import ApoEtapeVDI
from config import DevConfig
import sco_version
db = SQLAlchemy()
migrate = Migrate(compare_type=True)
login = LoginManager()
login.login_view = "auth.login"
login.login_message = "Identifiez-vous pour accéder à cette page."
mail = Mail()
CACHE_TYPE = os.environ.get("CACHE_TYPE")
cache = Cache(
config={
# see https://flask-caching.readthedocs.io/en/latest/index.html#configuring-flask-caching
"CACHE_TYPE": CACHE_TYPE or "RedisCache",
# by default, never expire:
"CACHE_DEFAULT_TIMEOUT": os.environ.get("CACHE_DEFAULT_TIMEOUT") or 0,
}
)
# NumPy & Psycopg2 (necessary with Numpy 2.0)
# probablement à changer quand on passera à psycopg3.2
def adapt_numpy_scalar(numpy_scalar):
"""Adapt numeric types for psycopg2"""
return psycopg2_AsIs(numpy_scalar if not np.isnan(numpy_scalar) else "'NaN'")
psycopg2.extensions.register_adapter(np.float32, adapt_numpy_scalar)
psycopg2.extensions.register_adapter(np.float64, adapt_numpy_scalar)
psycopg2.extensions.register_adapter(np.int32, adapt_numpy_scalar)
psycopg2.extensions.register_adapter(np.int64, adapt_numpy_scalar)
def handle_sco_value_error(exc):
"page d'erreur avec message"
return render_template("sco_value_error.j2", exc=exc), 404
def handle_access_denied(exc):
return render_template("error_access_denied.j2", exc=exc), 403
def handle_invalid_csrf(exc):
"""Form submit with invalid CSRF token"""
# logout user and go back to login page with an error message
from app import auth
auth.logic.logout()
return render_template("error_csrf.j2", exc=exc), 404
# def handle_pdf_format_error(exc):
# return "ay ay ay"
handle_pdf_format_error = handle_sco_value_error
def internal_server_error(exc):
"""Bugs scodoc, erreurs 500"""
# note that we set the 500 status explicitly
from app.scodoc import sco_utils as scu
# Invalide tous les caches
log("internal_server_error: clearing caches")
clear_scodoc_cache()
return (
render_template(
"error_500.j2",
SCOVERSION=sco_version.SCOVERSION,
date=datetime.datetime.now().isoformat(),
exc=exc,
traceback_str_base64=base64.urlsafe_b64encode(
traceback.format_exc().encode(scu.SCO_ENCODING)
).decode(scu.SCO_ENCODING),
request_url=request.url,
scu=scu,
),
500,
)
def handle_sco_bug(exc):
"""Un bug, en général rare, sur lequel les dev cherchent des
informations pour le corriger.
"""
if current_app.config["TESTING"] or current_app.config["DEBUG"]:
raise ScoException # for development servers only
else:
Thread(
target=_async_dump, args=(current_app._get_current_object(), request.url)
).start()
return internal_server_error(exc)
def _async_dump(app, request_url: str):
from app.scodoc.sco_dump_db import sco_dump_and_send_db
with app.app_context():
ndb.open_db_connection()
try:
sco_dump_and_send_db("ScoBugCatcher", request_url=request_url)
except ScoValueError:
pass
def handle_invalid_usage(error):
response = json_response(data_=error.to_dict())
response.status_code = error.status_code
return response
# JSON ENCODING
# used by some internal finctions
# the API is now using flask_json, NOT THIS ENCODER
class ScoDocJSONEncoder(json.JSONEncoder):
def default(self, o): # pylint: disable=E0202
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
elif isinstance(o, ApoEtapeVDI):
return str(o)
else:
return json.JSONEncoder.default(self, o)
def render_raw_html(template_filename: str, **args) -> str:
"""Load and render an HTML file _without_ using Flask
Necessary for 503 error message, when DB is down and Flask may be broken.
"""
template_path = os.path.join(
current_app.config["SCODOC_DIR"],
"app",
current_app.template_folder,
template_filename,
)
with open(template_path) as f:
txt = f.read().format(**args)
return txt
def postgresql_server_error(e):
"""Erreur de connection au serveur postgresql (voir notesdb.open_db_connection)"""
return render_raw_html("error_503.j2", SCOVERSION=sco_version.SCOVERSION), 503
class LogRequestFormatter(logging.Formatter):
"""Ajoute URL et remote_addr for logging"""
def format(self, record):
if has_request_context():
record.url = request.url
record.remote_addr = request.remote_addr
else:
record.url = None
record.remote_addr = None
record.sco_user = current_user
if has_request_context():
record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"]
else:
record.sco_admin_mail = "(pas de requête)"
return super().format(record)
class LogExceptionFormatter(logging.Formatter):
"""Formatteur pour les exceptions: ajoute détails"""
def format(self, record):
if has_request_context():
record.url = request.url
record.remote_addr = request.environ.get(
"HTTP_X_FORWARDED_FOR", request.remote_addr
)
record.http_referrer = request.referrer
record.http_method = request.method
if request.method == "GET":
record.http_params = str(request.args)
else:
# rep = reprlib.Repr() # abbrège
record.http_params = str(request.form)[:2048]
else:
record.url = None
record.remote_addr = None
record.http_referrer = None
record.http_method = None
record.http_params = None
record.sco_user = current_user
if has_request_context():
record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"]
else:
record.sco_admin_mail = "(pas de requête)"
return super().format(record)
class ScoSMTPHandler(SMTPHandler):
def getSubject(self, record: logging.LogRecord) -> str:
stack_summary = traceback.extract_tb(record.exc_info[2])
frame_summary = stack_summary[-1]
subject = f"ScoExc({sco_version.SCOVERSION}): {record.exc_info[0].__name__} in {frame_summary.name} {frame_summary.filename}"
return subject
class ReverseProxied:
"""Adaptateur wsgi qui nous permet d'avoir toutes les URL calculées en https
sauf quand on est en dev.
La variable HTTP_X_FORWARDED_PROTO est positionnée par notre config nginx"""
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
scheme = environ.get("HTTP_X_FORWARDED_PROTO")
if scheme:
environ["wsgi.url_scheme"] = scheme # ou forcer à https ici ?
return self.app(environ, start_response)
def celery_init_app(app: Flask) -> Celery:
"""Initialize Celery.
See https://flask.palletsprojects.com/en/latest/patterns/celery
"""
class FlaskTask(Task):
"Task in Flask context"
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
def create_app(config_class=DevConfig) -> Flask:
"""Create Flask application"""
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
app.config.from_object(config_class)
from app.auth import cas
CAS(app, url_prefix="/cas", configuration_function=cas.set_cas_configuration)
app.wsgi_app = ReverseProxied(app.wsgi_app)
# --- Celery
celery_init_app(app)
# --- JSON
app_json = FlaskJSON(app)
@app_json.encoder
def scodoc_json_encoder(o):
"Overide default date encoding (RFC 822) and use ISO"
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
# Pour conserver l'ordre des objets dans les JSON:
# e.g. l'ordre des UE dans les bulletins
app.json.sort_keys = False
# Evite de logguer toutes les requetes dans notre log
logging.getLogger("werkzeug").disabled = True
app.logger.setLevel(app.config["LOG_LEVEL"])
if app.config["TESTING"] or app.config["DEBUG"]:
# S'arrête sur tous les warnings, sauf
# flask_sqlalchemy/query (pb deprecation du model.get())
warnings.filterwarnings("error", module="flask_sqlalchemy/query")
# warnings.filterwarnings("ignore", module="json/provider.py") xxx sans effet en test
if app.config["DEBUG"]:
# comme on a désactivé ci-dessus les logs de werkzeug,
# on affiche nous même le PIN en mode debug:
print(
f""" * Debugger is active!
* Debugger PIN: {werkzeug.debug.get_pin_and_cookie_name(app)[0]}
"""
)
# Vérifie/crée lien sym pour les URL statiques
link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}"
if not os.path.exists(link_filename):
app.logger.info(f"creating symlink {link_filename}")
os.symlink("..", link_filename)
db.init_app(app)
migrate.init_app(app, db)
login.init_app(app)
mail.init_app(app)
app.extensions["mail"].debug = 0 # disable copy of mails to stderr
cache.init_app(app)
sco_cache.CACHE = cache
if CACHE_TYPE: # non default
app.logger.info(f"CACHE_TYPE={CACHE_TYPE}")
app.register_error_handler(ScoGenError, handle_sco_value_error)
app.register_error_handler(ScoValueError, handle_sco_value_error)
app.register_error_handler(ScoBugCatcher, handle_sco_bug)
app.register_error_handler(ScoInvalidCSRF, handle_invalid_csrf)
app.register_error_handler(ScoPDFFormatError, handle_pdf_format_error)
app.register_error_handler(AccessDenied, handle_access_denied)
app.register_error_handler(500, internal_server_error)
app.register_error_handler(503, postgresql_server_error)
app.register_error_handler(APIInvalidParams, handle_invalid_usage)
from app.auth import bp as auth_bp
app.register_blueprint(auth_bp, url_prefix="/auth")
from app.entreprises import bp as entreprises_bp
app.register_blueprint(entreprises_bp, url_prefix="/ScoDoc/entreprises")
from app.views import scodoc_bp
from app.views import scolar_bp
from app.views import notes_bp
from app.views import users_bp
from app.views import absences_bp
from app.views import assiduites_bp
from app.api import api_bp
from app.api import api_web_bp
# Jinja2 configuration
# Enable autoescaping of all templates, including .j2
app.jinja_env.autoescape = select_autoescape(default_for_string=True, default=True)
app.jinja_env.trim_blocks = True
app.jinja_env.lstrip_blocks = True
app.jinja_env.globals["is_hidden_field"] = lambda field: isinstance(
field, HiddenField
)
# https://scodoc.fr/ScoDoc
app.register_blueprint(scodoc_bp)
# https://scodoc.fr/ScoDoc/RT/Scolarite/...
app.register_blueprint(scolar_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite")
# https://scodoc.fr/ScoDoc/RT/Scolarite/Notes/...
app.register_blueprint(notes_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Notes")
# https://scodoc.fr/ScoDoc/RT/Scolarite/Users/...
app.register_blueprint(users_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Users")
# https://scodoc.fr/ScoDoc/RT/Scolarite/Absences/...
app.register_blueprint(
absences_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Absences"
)
app.register_blueprint(
assiduites_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Assiduites"
)
app.register_blueprint(api_bp, url_prefix="/ScoDoc/api")
app.register_blueprint(api_web_bp, url_prefix="/ScoDoc/<scodoc_dept>/api")
scodoc_log_formatter = LogRequestFormatter(
"[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n"
"%(levelname)s: %(message)s"
)
# les champs additionnels sont définis dans LogRequestFormatter
scodoc_exc_formatter = LogExceptionFormatter(
"[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n"
"%(levelname)s: %(message)s\n"
"Referrer: %(http_referrer)s\n"
"Method: %(http_method)s\n"
"Params: %(http_params)s\n"
"Admin mail: %(sco_admin_mail)s\n"
)
if not app.testing:
if not app.debug:
# --- Config logs pour PRODUCTION
# On supprime le logguer par défaut qui va vers stderr et pollue les logs systemes
app.logger.removeHandler(default_handler)
# --- Mail des messages ERROR et CRITICAL
if app.config["MAIL_SERVER"]:
auth = None
if app.config["MAIL_USERNAME"] or app.config["MAIL_PASSWORD"]:
auth = (app.config["MAIL_USERNAME"], app.config["MAIL_PASSWORD"])
secure = None
if app.config["MAIL_USE_TLS"]:
secure = ()
host_name = socket.gethostname()
mail_handler = ScoSMTPHandler(
mailhost=(app.config["MAIL_SERVER"], app.config["MAIL_PORT"]),
fromaddr=app.config["SCODOC_MAIL_FROM"],
toaddrs=["exception@scodoc.org"],
subject="ScoDoc Exception", # unused see ScoSMTPHandler
credentials=auth,
secure=secure,
)
mail_handler.setFormatter(scodoc_exc_formatter)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
else:
# Pour logs en DEV uniquement:
default_handler.setFormatter(scodoc_log_formatter)
# Config logs pour DEV et PRODUCTION
# Configuration des logs (actifs aussi en mode development)
# usually /opt/scodoc-data/log/scodoc.log:
# rotated by logrotate
file_handler = WatchedFileHandler(
app.config["SCODOC_LOG_FILE"], encoding="utf-8"
)
file_handler.setFormatter(scodoc_log_formatter)
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
# Log pour les erreurs (exceptions) uniquement:
# usually /opt/scodoc-data/log/scodoc_exc.log
file_handler = WatchedFileHandler(
app.config["SCODOC_ERR_FILE"], encoding="utf-8"
)
file_handler.setFormatter(scodoc_exc_formatter)
file_handler.setLevel(logging.ERROR)
app.logger.addHandler(file_handler)
# app.logger.setLevel(logging.INFO)
app.logger.info(f"{sco_version.SCONAME} {sco_version.SCOVERSION} startup")
app.logger.info(
f"create_app({config_class.__name__}, {config_class.SQLALCHEMY_DATABASE_URI})"
)
# ---- INITIALISATION SPECIFIQUES A SCODOC
from app.scodoc import sco_bulletins_generator
from app.scodoc.sco_bulletins_legacy import BulletinGeneratorLegacy
from app.scodoc.sco_bulletins_standard import BulletinGeneratorStandard
from app.but.bulletin_but_pdf import BulletinGeneratorStandardBUT
from app.scodoc.sco_bulletins_ucac import BulletinGeneratorUCAC
# l'ordre est important, le premier sera le "défaut" pour les nouveaux départements.
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandard)
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandardBUT)
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorLegacy)
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorUCAC)
if app.testing or app.debug:
from app.scodoc.sco_bulletins_example import BulletinGeneratorExample
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample)
from app.auth.cas import set_cas_configuration
with app.app_context():
try:
set_cas_configuration(app)
except sa.exc.ProgrammingError:
# Si la base n'a pas été upgradée (arrive durrant l'install)
# il se peut que la table scodoc_site_config n'existe pas encore.
pass
return app
def set_sco_dept(scodoc_dept: str, open_cnx=True):
"""Set global g object to given dept and open db connection if needed"""
# Check that dept exists
try:
dept = Departement.query.filter_by(acronym=scodoc_dept).first()
except sa.exc.OperationalError:
abort(503)
if not dept:
raise ScoValueError(f"Invalid dept: {scodoc_dept}")
g.scodoc_dept = scodoc_dept # l'acronyme
g.scodoc_dept_id = dept.id # l'id
if open_cnx and not hasattr(g, "db_conn"):
ndb.open_db_connection()
if not hasattr(g, "stored_get_formsemestre"):
g.stored_get_formsemestre = {}
def user_db_init():
"""Initialize the users database.
Check that basic roles and admin user exist.
"""
from app.auth.models import User, Role
current_app.logger.info("Init User's db")
# Create roles:
Role.reset_standard_roles_permissions()
current_app.logger.info("created initial roles")
# Ensure that admin exists
admin_mail = current_app.config.get("SCODOC_ADMIN_MAIL")
if admin_mail:
admin_user_name = current_app.config["SCODOC_ADMIN_LOGIN"]
user = User.query.filter_by(user_name=admin_user_name).first()
if not user:
user = User(user_name=admin_user_name, email=admin_mail)
try:
db.session.add(user)
db.session.commit()
except:
db.session.rollback()
raise
current_app.logger.info(
"created initial admin user, login: {u.user_name}, email: {u.email}".format(
u=user
)
)
def sco_db_insert_constants():
"""Initialize Sco database: insert some constants (modalités, ...)."""
from app import models
current_app.logger.info("Init Sco db")
# Modalités:
models.FormationModalite.insert_modalites()
def initialize_scodoc_database(erase=False, create_all=False):
"""Initialize the database for unit tests
Starts from an existing database and create all necessary
SQL tables and functions.
If erase is True, _erase_ all database content.
"""
# - ERASE (the truncation sql function has been defined above)
if erase:
truncate_database()
# - Create all tables
if create_all:
# managed by migrations, except for TESTS
db.create_all()
# - Insert initial roles and create super-admin user
user_db_init()
# - Insert some constant values (modalites, ...)
sco_db_insert_constants()
# - Flush cache
clear_scodoc_cache()
def truncate_database():
"""Erase content of all tables (including users !) from
the current database.
"""
# use a stored SQL function, see createtables.sql
try:
db.session.execute(sa.text("SELECT truncate_tables('scodoc');"))
db.session.commit()
except:
db.session.rollback()
raise
# Remet les compteurs (séquences sql) à zéro
db.session.execute(
sa.text(
"""
CREATE OR REPLACE FUNCTION reset_sequences(username IN VARCHAR) RETURNS void AS $$
DECLARE
statements CURSOR FOR
SELECT sequence_name
FROM information_schema.sequences
ORDER BY sequence_name ;
BEGIN
FOR stmt IN statements LOOP
EXECUTE 'ALTER SEQUENCE ' || quote_ident(stmt.sequence_name) || ' RESTART;';
END LOOP;
END;
$$ LANGUAGE plpgsql;
SELECT reset_sequences('scodoc');
"""
)
)
db.session.commit()
def clear_scodoc_cache():
"""Clear ScoDoc cache
This cache (currently Redis) is persistent between invocation
and it may be necessary to clear it during developement or tests.
"""
# attaque directement redis, court-circuite ScoDoc:
import redis
r = redis.Redis()
r.flushall()
# Also clear local caches:
sco_preferences.clear_base_preferences()
# --------- Logging
def log(msg: str):
"""log a message.
If Flask app, use configured logger, else stderr.
"""
try:
dept = getattr(g, "scodoc_dept", "")
msg = f" ({dept}) {msg}"
except RuntimeError:
# Flask Working outside of application context.
pass
if current_app and not current_app.config["DEBUG"]:
current_app.logger.info(msg)
else:
sys.stdout.flush()
sys.stderr.write(
f"""[{time.strftime("%a %b %d %H:%M:%S %Y")}] scodoc: {msg}\n"""
)
sys.stderr.flush()
# Debug: log call stack
def log_call_stack():
log("Call stack:\n" + "\n".join(x.strip() for x in traceback.format_stack()[:-1]))
# Alarms by email:
def send_scodoc_alarm(subject, txt):
from app import email
sender = email.get_from_addr()
email.send_email(subject, sender, ["exception@scodoc.org"], txt)
from app.models import Departement
from app.scodoc import notesdb as ndb, sco_preferences
from app.scodoc import sco_cache
def scodoc_flash_status_messages():
"""Should be called on each page: flash messages indicating specific ScoDoc status"""
email_test_mode_address = sco_preferences.get_preference("email_test_mode_address")
if email_test_mode_address:
flash(
f"Mode test: mails redirigés vers {email_test_mode_address}",
category="warning",
)
def critical_error(msg):
"""Handle a critical error: flush all caches, display message to the user"""
log(f"\n*** CRITICAL ERROR: {msg}")
subject = f"CRITICAL ERROR: {msg}".strip()[:68]
send_scodoc_alarm(subject, msg)
clear_scodoc_cache()
raise ScoValueError(
f"""
Une erreur est survenue, veuillez ré-essayer.
{msg}
"""
)