forked from ScoDoc/ScoDoc
687 lines
23 KiB
Python
Executable File
687 lines
23 KiB
Python
Executable File
# -*- coding: UTF-8 -*
|
|
# pylint: disable=invalid-name
|
|
|
|
import base64
|
|
import datetime
|
|
import json
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
import logging
|
|
from logging.handlers import SMTPHandler, WatchedFileHandler
|
|
from threading import Thread
|
|
import warnings
|
|
|
|
from celery import Celery, Task
|
|
from flask import current_app, g, request
|
|
from flask import Flask
|
|
from flask import abort, flash, has_request_context
|
|
from flask import render_template
|
|
from flask.logging import default_handler
|
|
from flask_caching import Cache
|
|
from flask_json import FlaskJSON, json_response
|
|
from flask_login import LoginManager, current_user
|
|
from flask_mail import Mail
|
|
from flask_migrate import Migrate
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
|
|
from jinja2 import select_autoescape
|
|
import numpy as np
|
|
import psycopg2
|
|
from psycopg2.extensions import AsIs as psycopg2_AsIs
|
|
import sqlalchemy as sa
|
|
import werkzeug.debug
|
|
from wtforms.fields import HiddenField
|
|
|
|
from flask_cas import CAS
|
|
|
|
from app.scodoc.sco_exceptions import (
|
|
AccessDenied,
|
|
ScoBugCatcher,
|
|
ScoException,
|
|
ScoGenError,
|
|
ScoInvalidCSRF,
|
|
ScoPDFFormatError,
|
|
ScoValueError,
|
|
APIInvalidParams,
|
|
)
|
|
from app.scodoc.sco_vdi import ApoEtapeVDI
|
|
|
|
from config import DevConfig
|
|
import sco_version
|
|
|
|
db = SQLAlchemy()
|
|
migrate = Migrate(compare_type=True)
|
|
login = LoginManager()
|
|
login.login_view = "auth.login"
|
|
login.login_message = "Identifiez-vous pour accéder à cette page."
|
|
|
|
mail = Mail()
|
|
|
|
CACHE_TYPE = os.environ.get("CACHE_TYPE")
|
|
cache = Cache(
|
|
config={
|
|
# see https://flask-caching.readthedocs.io/en/latest/index.html#configuring-flask-caching
|
|
"CACHE_TYPE": CACHE_TYPE or "RedisCache",
|
|
# by default, never expire:
|
|
"CACHE_DEFAULT_TIMEOUT": os.environ.get("CACHE_DEFAULT_TIMEOUT") or 0,
|
|
}
|
|
)
|
|
|
|
|
|
# NumPy & Psycopg2 (necessary with Numpy 2.0)
|
|
# probablement à changer quand on passera à psycopg3.2
|
|
def adapt_numpy_scalar(numpy_scalar):
|
|
"""Adapt numeric types for psycopg2"""
|
|
return psycopg2_AsIs(numpy_scalar if not np.isnan(numpy_scalar) else "'NaN'")
|
|
|
|
|
|
psycopg2.extensions.register_adapter(np.float32, adapt_numpy_scalar)
|
|
psycopg2.extensions.register_adapter(np.float64, adapt_numpy_scalar)
|
|
psycopg2.extensions.register_adapter(np.int32, adapt_numpy_scalar)
|
|
psycopg2.extensions.register_adapter(np.int64, adapt_numpy_scalar)
|
|
|
|
|
|
def handle_sco_value_error(exc):
|
|
"page d'erreur avec message"
|
|
return render_template("sco_value_error.j2", exc=exc), 404
|
|
|
|
|
|
def handle_access_denied(exc):
|
|
return render_template("error_access_denied.j2", exc=exc), 403
|
|
|
|
|
|
def handle_invalid_csrf(exc):
|
|
"""Form submit with invalid CSRF token"""
|
|
# logout user and go back to login page with an error message
|
|
from app import auth
|
|
|
|
auth.logic.logout()
|
|
return render_template("error_csrf.j2", exc=exc), 404
|
|
|
|
|
|
# def handle_pdf_format_error(exc):
|
|
# return "ay ay ay"
|
|
handle_pdf_format_error = handle_sco_value_error
|
|
|
|
|
|
def internal_server_error(exc):
|
|
"""Bugs scodoc, erreurs 500"""
|
|
# note that we set the 500 status explicitly
|
|
from app.scodoc import sco_utils as scu
|
|
|
|
# Invalide tous les caches
|
|
log("internal_server_error: clearing caches")
|
|
clear_scodoc_cache()
|
|
|
|
return (
|
|
render_template(
|
|
"error_500.j2",
|
|
SCOVERSION=sco_version.SCOVERSION,
|
|
date=datetime.datetime.now().isoformat(),
|
|
exc=exc,
|
|
traceback_str_base64=base64.urlsafe_b64encode(
|
|
traceback.format_exc().encode(scu.SCO_ENCODING)
|
|
).decode(scu.SCO_ENCODING),
|
|
request_url=request.url,
|
|
scu=scu,
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
def handle_sco_bug(exc):
|
|
"""Un bug, en général rare, sur lequel les dev cherchent des
|
|
informations pour le corriger.
|
|
"""
|
|
if current_app.config["TESTING"] or current_app.config["DEBUG"]:
|
|
raise ScoException # for development servers only
|
|
else:
|
|
Thread(
|
|
target=_async_dump, args=(current_app._get_current_object(), request.url)
|
|
).start()
|
|
|
|
return internal_server_error(exc)
|
|
|
|
|
|
def _async_dump(app, request_url: str):
|
|
from app.scodoc.sco_dump_db import sco_dump_and_send_db
|
|
|
|
with app.app_context():
|
|
ndb.open_db_connection()
|
|
try:
|
|
sco_dump_and_send_db("ScoBugCatcher", request_url=request_url)
|
|
except ScoValueError:
|
|
pass
|
|
|
|
|
|
def handle_invalid_usage(error):
|
|
response = json_response(data_=error.to_dict())
|
|
response.status_code = error.status_code
|
|
return response
|
|
|
|
|
|
# JSON ENCODING
|
|
# used by some internal finctions
|
|
# the API is now using flask_json, NOT THIS ENCODER
|
|
class ScoDocJSONEncoder(json.JSONEncoder):
|
|
def default(self, o): # pylint: disable=E0202
|
|
if isinstance(o, (datetime.date, datetime.datetime)):
|
|
return o.isoformat()
|
|
elif isinstance(o, ApoEtapeVDI):
|
|
return str(o)
|
|
else:
|
|
return json.JSONEncoder.default(self, o)
|
|
|
|
|
|
def render_raw_html(template_filename: str, **args) -> str:
|
|
"""Load and render an HTML file _without_ using Flask
|
|
Necessary for 503 error message, when DB is down and Flask may be broken.
|
|
"""
|
|
template_path = os.path.join(
|
|
current_app.config["SCODOC_DIR"],
|
|
"app",
|
|
current_app.template_folder,
|
|
template_filename,
|
|
)
|
|
with open(template_path) as f:
|
|
txt = f.read().format(**args)
|
|
return txt
|
|
|
|
|
|
def postgresql_server_error(e):
|
|
"""Erreur de connection au serveur postgresql (voir notesdb.open_db_connection)"""
|
|
return render_raw_html("error_503.j2", SCOVERSION=sco_version.SCOVERSION), 503
|
|
|
|
|
|
class LogRequestFormatter(logging.Formatter):
|
|
"""Ajoute URL et remote_addr for logging"""
|
|
|
|
def format(self, record):
|
|
if has_request_context():
|
|
record.url = request.url
|
|
record.remote_addr = request.remote_addr
|
|
else:
|
|
record.url = None
|
|
record.remote_addr = None
|
|
record.sco_user = current_user
|
|
if has_request_context():
|
|
record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"]
|
|
else:
|
|
record.sco_admin_mail = "(pas de requête)"
|
|
|
|
return super().format(record)
|
|
|
|
|
|
class LogExceptionFormatter(logging.Formatter):
|
|
"""Formatteur pour les exceptions: ajoute détails"""
|
|
|
|
def format(self, record):
|
|
if has_request_context():
|
|
record.url = request.url
|
|
record.remote_addr = request.environ.get(
|
|
"HTTP_X_FORWARDED_FOR", request.remote_addr
|
|
)
|
|
record.http_referrer = request.referrer
|
|
record.http_method = request.method
|
|
if request.method == "GET":
|
|
record.http_params = str(request.args)
|
|
else:
|
|
# rep = reprlib.Repr() # abbrège
|
|
record.http_params = str(request.form)[:2048]
|
|
else:
|
|
record.url = None
|
|
record.remote_addr = None
|
|
record.http_referrer = None
|
|
record.http_method = None
|
|
record.http_params = None
|
|
record.sco_user = current_user
|
|
|
|
if has_request_context():
|
|
record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"]
|
|
else:
|
|
record.sco_admin_mail = "(pas de requête)"
|
|
return super().format(record)
|
|
|
|
|
|
class ScoSMTPHandler(SMTPHandler):
|
|
def getSubject(self, record: logging.LogRecord) -> str:
|
|
stack_summary = traceback.extract_tb(record.exc_info[2])
|
|
frame_summary = stack_summary[-1]
|
|
subject = f"ScoExc({sco_version.SCOVERSION}): {record.exc_info[0].__name__} in {frame_summary.name} {frame_summary.filename}"
|
|
|
|
return subject
|
|
|
|
|
|
class ReverseProxied:
|
|
"""Adaptateur wsgi qui nous permet d'avoir toutes les URL calculées en https
|
|
sauf quand on est en dev.
|
|
La variable HTTP_X_FORWARDED_PROTO est positionnée par notre config nginx"""
|
|
|
|
def __init__(self, app):
|
|
self.app = app
|
|
|
|
def __call__(self, environ, start_response):
|
|
scheme = environ.get("HTTP_X_FORWARDED_PROTO")
|
|
if scheme:
|
|
environ["wsgi.url_scheme"] = scheme # ou forcer à https ici ?
|
|
return self.app(environ, start_response)
|
|
|
|
|
|
def celery_init_app(app: Flask) -> Celery:
|
|
"""Initialize Celery.
|
|
See https://flask.palletsprojects.com/en/latest/patterns/celery
|
|
"""
|
|
|
|
class FlaskTask(Task):
|
|
"Task in Flask context"
|
|
|
|
def __call__(self, *args: object, **kwargs: object) -> object:
|
|
with app.app_context():
|
|
return self.run(*args, **kwargs)
|
|
|
|
celery_app = Celery(app.name, task_cls=FlaskTask)
|
|
celery_app.config_from_object(app.config["CELERY"])
|
|
celery_app.set_default()
|
|
app.extensions["celery"] = celery_app
|
|
return celery_app
|
|
|
|
|
|
def create_app(config_class=DevConfig) -> Flask:
|
|
"""Create Flask application"""
|
|
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
|
|
app.config.from_object(config_class)
|
|
from app.auth import cas
|
|
|
|
CAS(app, url_prefix="/cas", configuration_function=cas.set_cas_configuration)
|
|
app.wsgi_app = ReverseProxied(app.wsgi_app)
|
|
# --- Celery
|
|
celery_init_app(app)
|
|
# --- JSON
|
|
app_json = FlaskJSON(app)
|
|
|
|
@app_json.encoder
|
|
def scodoc_json_encoder(o):
|
|
"Overide default date encoding (RFC 822) and use ISO"
|
|
if isinstance(o, (datetime.date, datetime.datetime)):
|
|
return o.isoformat()
|
|
|
|
# Pour conserver l'ordre des objets dans les JSON:
|
|
# e.g. l'ordre des UE dans les bulletins
|
|
app.json.sort_keys = False
|
|
|
|
# Evite de logguer toutes les requetes dans notre log
|
|
logging.getLogger("werkzeug").disabled = True
|
|
app.logger.setLevel(app.config["LOG_LEVEL"])
|
|
if app.config["TESTING"] or app.config["DEBUG"]:
|
|
# S'arrête sur tous les warnings, sauf
|
|
# flask_sqlalchemy/query (pb deprecation du model.get())
|
|
warnings.filterwarnings("error", module="flask_sqlalchemy/query")
|
|
# warnings.filterwarnings("ignore", module="json/provider.py") xxx sans effet en test
|
|
if app.config["DEBUG"]:
|
|
# comme on a désactivé ci-dessus les logs de werkzeug,
|
|
# on affiche nous même le PIN en mode debug:
|
|
print(
|
|
f""" * Debugger is active!
|
|
* Debugger PIN: {werkzeug.debug.get_pin_and_cookie_name(app)[0]}
|
|
"""
|
|
)
|
|
# Vérifie/crée lien sym pour les URL statiques
|
|
link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}"
|
|
if not os.path.exists(link_filename):
|
|
app.logger.info(f"creating symlink {link_filename}")
|
|
os.symlink("..", link_filename)
|
|
|
|
db.init_app(app)
|
|
migrate.init_app(app, db)
|
|
login.init_app(app)
|
|
mail.init_app(app)
|
|
app.extensions["mail"].debug = 0 # disable copy of mails to stderr
|
|
cache.init_app(app)
|
|
sco_cache.CACHE = cache
|
|
if CACHE_TYPE: # non default
|
|
app.logger.info(f"CACHE_TYPE={CACHE_TYPE}")
|
|
|
|
app.register_error_handler(ScoGenError, handle_sco_value_error)
|
|
app.register_error_handler(ScoValueError, handle_sco_value_error)
|
|
app.register_error_handler(ScoBugCatcher, handle_sco_bug)
|
|
app.register_error_handler(ScoInvalidCSRF, handle_invalid_csrf)
|
|
app.register_error_handler(ScoPDFFormatError, handle_pdf_format_error)
|
|
app.register_error_handler(AccessDenied, handle_access_denied)
|
|
app.register_error_handler(500, internal_server_error)
|
|
app.register_error_handler(503, postgresql_server_error)
|
|
app.register_error_handler(APIInvalidParams, handle_invalid_usage)
|
|
|
|
from app.auth import bp as auth_bp
|
|
|
|
app.register_blueprint(auth_bp, url_prefix="/auth")
|
|
|
|
from app.entreprises import bp as entreprises_bp
|
|
|
|
app.register_blueprint(entreprises_bp, url_prefix="/ScoDoc/entreprises")
|
|
|
|
from app.views import scodoc_bp
|
|
from app.views import scolar_bp
|
|
from app.views import notes_bp
|
|
from app.views import users_bp
|
|
from app.views import absences_bp
|
|
from app.views import assiduites_bp
|
|
from app.api import api_bp
|
|
from app.api import api_web_bp
|
|
|
|
# Jinja2 configuration
|
|
# Enable autoescaping of all templates, including .j2
|
|
app.jinja_env.autoescape = select_autoescape(default_for_string=True, default=True)
|
|
app.jinja_env.trim_blocks = True
|
|
app.jinja_env.lstrip_blocks = True
|
|
app.jinja_env.globals["is_hidden_field"] = lambda field: isinstance(
|
|
field, HiddenField
|
|
)
|
|
|
|
# https://scodoc.fr/ScoDoc
|
|
app.register_blueprint(scodoc_bp)
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/...
|
|
app.register_blueprint(scolar_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite")
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/Notes/...
|
|
app.register_blueprint(notes_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Notes")
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/Users/...
|
|
app.register_blueprint(users_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Users")
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/Absences/...
|
|
app.register_blueprint(
|
|
absences_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Absences"
|
|
)
|
|
app.register_blueprint(
|
|
assiduites_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Assiduites"
|
|
)
|
|
app.register_blueprint(api_bp, url_prefix="/ScoDoc/api")
|
|
app.register_blueprint(api_web_bp, url_prefix="/ScoDoc/<scodoc_dept>/api")
|
|
|
|
scodoc_log_formatter = LogRequestFormatter(
|
|
"[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n"
|
|
"%(levelname)s: %(message)s"
|
|
)
|
|
# les champs additionnels sont définis dans LogRequestFormatter
|
|
scodoc_exc_formatter = LogExceptionFormatter(
|
|
"[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n"
|
|
"%(levelname)s: %(message)s\n"
|
|
"Referrer: %(http_referrer)s\n"
|
|
"Method: %(http_method)s\n"
|
|
"Params: %(http_params)s\n"
|
|
"Admin mail: %(sco_admin_mail)s\n"
|
|
)
|
|
if not app.testing:
|
|
if not app.debug:
|
|
# --- Config logs pour PRODUCTION
|
|
# On supprime le logguer par défaut qui va vers stderr et pollue les logs systemes
|
|
app.logger.removeHandler(default_handler)
|
|
# --- Mail des messages ERROR et CRITICAL
|
|
if app.config["MAIL_SERVER"]:
|
|
auth = None
|
|
if app.config["MAIL_USERNAME"] or app.config["MAIL_PASSWORD"]:
|
|
auth = (app.config["MAIL_USERNAME"], app.config["MAIL_PASSWORD"])
|
|
secure = None
|
|
if app.config["MAIL_USE_TLS"]:
|
|
secure = ()
|
|
host_name = socket.gethostname()
|
|
mail_handler = ScoSMTPHandler(
|
|
mailhost=(app.config["MAIL_SERVER"], app.config["MAIL_PORT"]),
|
|
fromaddr=app.config["SCODOC_MAIL_FROM"],
|
|
toaddrs=["exception@scodoc.org"],
|
|
subject="ScoDoc Exception", # unused see ScoSMTPHandler
|
|
credentials=auth,
|
|
secure=secure,
|
|
)
|
|
mail_handler.setFormatter(scodoc_exc_formatter)
|
|
mail_handler.setLevel(logging.ERROR)
|
|
app.logger.addHandler(mail_handler)
|
|
else:
|
|
# Pour logs en DEV uniquement:
|
|
default_handler.setFormatter(scodoc_log_formatter)
|
|
|
|
# Config logs pour DEV et PRODUCTION
|
|
# Configuration des logs (actifs aussi en mode development)
|
|
# usually /opt/scodoc-data/log/scodoc.log:
|
|
# rotated by logrotate
|
|
file_handler = WatchedFileHandler(
|
|
app.config["SCODOC_LOG_FILE"], encoding="utf-8"
|
|
)
|
|
file_handler.setFormatter(scodoc_log_formatter)
|
|
file_handler.setLevel(logging.INFO)
|
|
app.logger.addHandler(file_handler)
|
|
# Log pour les erreurs (exceptions) uniquement:
|
|
# usually /opt/scodoc-data/log/scodoc_exc.log
|
|
file_handler = WatchedFileHandler(
|
|
app.config["SCODOC_ERR_FILE"], encoding="utf-8"
|
|
)
|
|
file_handler.setFormatter(scodoc_exc_formatter)
|
|
file_handler.setLevel(logging.ERROR)
|
|
app.logger.addHandler(file_handler)
|
|
|
|
# app.logger.setLevel(logging.INFO)
|
|
app.logger.info(f"{sco_version.SCONAME} {sco_version.SCOVERSION} startup")
|
|
app.logger.info(
|
|
f"create_app({config_class.__name__}, {config_class.SQLALCHEMY_DATABASE_URI})"
|
|
)
|
|
# ---- INITIALISATION SPECIFIQUES A SCODOC
|
|
from app.scodoc import sco_bulletins_generator
|
|
|
|
from app.scodoc.sco_bulletins_legacy import BulletinGeneratorLegacy
|
|
from app.scodoc.sco_bulletins_standard import BulletinGeneratorStandard
|
|
from app.but.bulletin_but_pdf import BulletinGeneratorStandardBUT
|
|
from app.scodoc.sco_bulletins_ucac import BulletinGeneratorUCAC
|
|
|
|
# l'ordre est important, le premier sera le "défaut" pour les nouveaux départements.
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandard)
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandardBUT)
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorLegacy)
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorUCAC)
|
|
if app.testing or app.debug:
|
|
from app.scodoc.sco_bulletins_example import BulletinGeneratorExample
|
|
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample)
|
|
|
|
from app.auth.cas import set_cas_configuration
|
|
|
|
with app.app_context():
|
|
try:
|
|
set_cas_configuration(app)
|
|
except sa.exc.ProgrammingError:
|
|
# Si la base n'a pas été upgradée (arrive durrant l'install)
|
|
# il se peut que la table scodoc_site_config n'existe pas encore.
|
|
pass
|
|
return app
|
|
|
|
|
|
def set_sco_dept(scodoc_dept: str, open_cnx=True):
|
|
"""Set global g object to given dept and open db connection if needed"""
|
|
# Check that dept exists
|
|
try:
|
|
dept = Departement.query.filter_by(acronym=scodoc_dept).first()
|
|
except sa.exc.OperationalError:
|
|
abort(503)
|
|
if not dept:
|
|
raise ScoValueError(f"Invalid dept: {scodoc_dept}")
|
|
g.scodoc_dept = scodoc_dept # l'acronyme
|
|
g.scodoc_dept_id = dept.id # l'id
|
|
if open_cnx and not hasattr(g, "db_conn"):
|
|
ndb.open_db_connection()
|
|
if not hasattr(g, "stored_get_formsemestre"):
|
|
g.stored_get_formsemestre = {}
|
|
|
|
|
|
def user_db_init():
|
|
"""Initialize the users database.
|
|
Check that basic roles and admin user exist.
|
|
"""
|
|
from app.auth.models import User, Role
|
|
|
|
current_app.logger.info("Init User's db")
|
|
# Create roles:
|
|
Role.reset_standard_roles_permissions()
|
|
current_app.logger.info("created initial roles")
|
|
# Ensure that admin exists
|
|
admin_mail = current_app.config.get("SCODOC_ADMIN_MAIL")
|
|
if admin_mail:
|
|
admin_user_name = current_app.config["SCODOC_ADMIN_LOGIN"]
|
|
user = User.query.filter_by(user_name=admin_user_name).first()
|
|
if not user:
|
|
user = User(user_name=admin_user_name, email=admin_mail)
|
|
try:
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
current_app.logger.info(
|
|
"created initial admin user, login: {u.user_name}, email: {u.email}".format(
|
|
u=user
|
|
)
|
|
)
|
|
|
|
|
|
def sco_db_insert_constants():
|
|
"""Initialize Sco database: insert some constants (modalités, ...)."""
|
|
from app import models
|
|
|
|
current_app.logger.info("Init Sco db")
|
|
# Modalités:
|
|
models.FormationModalite.insert_modalites()
|
|
|
|
|
|
def initialize_scodoc_database(erase=False, create_all=False):
|
|
"""Initialize the database for unit tests
|
|
Starts from an existing database and create all necessary
|
|
SQL tables and functions.
|
|
If erase is True, _erase_ all database content.
|
|
"""
|
|
# - ERASE (the truncation sql function has been defined above)
|
|
if erase:
|
|
truncate_database()
|
|
# - Create all tables
|
|
if create_all:
|
|
# managed by migrations, except for TESTS
|
|
db.create_all()
|
|
# - Insert initial roles and create super-admin user
|
|
user_db_init()
|
|
# - Insert some constant values (modalites, ...)
|
|
sco_db_insert_constants()
|
|
# - Flush cache
|
|
clear_scodoc_cache()
|
|
|
|
|
|
def truncate_database():
|
|
"""Erase content of all tables (including users !) from
|
|
the current database.
|
|
"""
|
|
# use a stored SQL function, see createtables.sql
|
|
try:
|
|
db.session.execute(sa.text("SELECT truncate_tables('scodoc');"))
|
|
db.session.commit()
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
# Remet les compteurs (séquences sql) à zéro
|
|
db.session.execute(
|
|
sa.text(
|
|
"""
|
|
CREATE OR REPLACE FUNCTION reset_sequences(username IN VARCHAR) RETURNS void AS $$
|
|
DECLARE
|
|
statements CURSOR FOR
|
|
SELECT sequence_name
|
|
FROM information_schema.sequences
|
|
ORDER BY sequence_name ;
|
|
BEGIN
|
|
FOR stmt IN statements LOOP
|
|
EXECUTE 'ALTER SEQUENCE ' || quote_ident(stmt.sequence_name) || ' RESTART;';
|
|
END LOOP;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
SELECT reset_sequences('scodoc');
|
|
"""
|
|
)
|
|
)
|
|
db.session.commit()
|
|
|
|
|
|
def clear_scodoc_cache():
|
|
"""Clear ScoDoc cache
|
|
This cache (currently Redis) is persistent between invocation
|
|
and it may be necessary to clear it during developement or tests.
|
|
"""
|
|
# attaque directement redis, court-circuite ScoDoc:
|
|
import redis
|
|
|
|
r = redis.Redis()
|
|
r.flushall()
|
|
# Also clear local caches:
|
|
sco_preferences.clear_base_preferences()
|
|
|
|
|
|
# --------- Logging
|
|
def log(msg: str):
|
|
"""log a message.
|
|
If Flask app, use configured logger, else stderr.
|
|
"""
|
|
try:
|
|
dept = getattr(g, "scodoc_dept", "")
|
|
msg = f" ({dept}) {msg}"
|
|
except RuntimeError:
|
|
# Flask Working outside of application context.
|
|
pass
|
|
|
|
if current_app and not current_app.config["DEBUG"]:
|
|
current_app.logger.info(msg)
|
|
else:
|
|
sys.stdout.flush()
|
|
sys.stderr.write(
|
|
f"""[{time.strftime("%a %b %d %H:%M:%S %Y")}] scodoc: {msg}\n"""
|
|
)
|
|
sys.stderr.flush()
|
|
|
|
|
|
# Debug: log call stack
|
|
def log_call_stack():
|
|
log("Call stack:\n" + "\n".join(x.strip() for x in traceback.format_stack()[:-1]))
|
|
|
|
|
|
# Alarms by email:
|
|
def send_scodoc_alarm(subject, txt):
|
|
from app import email
|
|
|
|
sender = email.get_from_addr()
|
|
email.send_email(subject, sender, ["exception@scodoc.org"], txt)
|
|
|
|
|
|
from app.models import Departement
|
|
from app.scodoc import notesdb as ndb, sco_preferences
|
|
from app.scodoc import sco_cache
|
|
|
|
|
|
def scodoc_flash_status_messages():
|
|
"""Should be called on each page: flash messages indicating specific ScoDoc status"""
|
|
email_test_mode_address = sco_preferences.get_preference("email_test_mode_address")
|
|
if email_test_mode_address:
|
|
flash(
|
|
f"Mode test: mails redirigés vers {email_test_mode_address}",
|
|
category="warning",
|
|
)
|
|
|
|
|
|
def critical_error(msg):
|
|
"""Handle a critical error: flush all caches, display message to the user"""
|
|
log(f"\n*** CRITICAL ERROR: {msg}")
|
|
subject = f"CRITICAL ERROR: {msg}".strip()[:68]
|
|
send_scodoc_alarm(subject, msg)
|
|
clear_scodoc_cache()
|
|
raise ScoValueError(
|
|
f"""
|
|
Une erreur est survenue, veuillez ré-essayer.
|
|
|
|
{msg}
|
|
"""
|
|
)
|