511 lines
17 KiB
Python
511 lines
17 KiB
Python
# -*- coding: UTF-8 -*
|
|
# pylint: disable=invalid-name
|
|
|
|
import datetime
|
|
import os
|
|
import socket
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
import logging
|
|
from logging.handlers import SMTPHandler, WatchedFileHandler
|
|
from threading import Thread
|
|
|
|
from flask import current_app, g, request
|
|
from flask import Flask
|
|
from flask import abort, flash, has_request_context, jsonify
|
|
from flask import render_template
|
|
from flask.logging import default_handler
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from flask_migrate import Migrate
|
|
from flask_login import LoginManager, current_user
|
|
from flask_mail import Mail
|
|
from flask_bootstrap import Bootstrap
|
|
from flask_moment import Moment
|
|
from flask_caching import Cache
|
|
import sqlalchemy
|
|
|
|
from app.scodoc.sco_exceptions import (
|
|
AccessDenied,
|
|
ScoBugCatcher,
|
|
ScoGenError,
|
|
ScoValueError,
|
|
APIInvalidParams,
|
|
)
|
|
from config import DevConfig
|
|
import sco_version
|
|
|
|
db = SQLAlchemy()
|
|
migrate = Migrate(compare_type=True)
|
|
login = LoginManager()
|
|
login.login_view = "auth.login"
|
|
login.login_message = "Identifiez-vous pour accéder à cette page."
|
|
mail = Mail()
|
|
bootstrap = Bootstrap()
|
|
moment = Moment()
|
|
|
|
CACHE_TYPE = os.environ.get("CACHE_TYPE")
|
|
cache = Cache(
|
|
config={
|
|
# see https://flask-caching.readthedocs.io/en/latest/index.html#configuring-flask-caching
|
|
"CACHE_TYPE": CACHE_TYPE or "RedisCache",
|
|
# by default, never expire:
|
|
"CACHE_DEFAULT_TIMEOUT": os.environ.get("CACHE_DEFAULT_TIMEOUT") or 0,
|
|
}
|
|
)
|
|
|
|
|
|
def handle_sco_value_error(exc):
|
|
return render_template("sco_value_error.html", exc=exc), 404
|
|
|
|
|
|
def handle_access_denied(exc):
|
|
return render_template("error_access_denied.html", exc=exc), 403
|
|
|
|
|
|
def internal_server_error(exc):
|
|
"""Bugs scodoc, erreurs 500"""
|
|
# note that we set the 500 status explicitly
|
|
return (
|
|
render_template(
|
|
"error_500.html",
|
|
SCOVERSION=sco_version.SCOVERSION,
|
|
date=datetime.datetime.now().isoformat(),
|
|
exc=exc,
|
|
request_url=request.url,
|
|
),
|
|
500,
|
|
)
|
|
|
|
|
|
def handle_sco_bug(exc):
|
|
"""Un bug, en général rare, sur lequel les dev cherchent des
|
|
informations pour le corriger.
|
|
"""
|
|
Thread(
|
|
target=_async_dump, args=(current_app._get_current_object(), request.url)
|
|
).start()
|
|
|
|
return internal_server_error(exc)
|
|
|
|
|
|
def _async_dump(app, request_url: str):
|
|
from app.scodoc.sco_dump_db import sco_dump_and_send_db
|
|
|
|
with app.app_context():
|
|
ndb.open_db_connection()
|
|
try:
|
|
sco_dump_and_send_db("ScoBugCatcher", request_url=request_url)
|
|
except ScoValueError:
|
|
pass
|
|
|
|
|
|
def handle_invalid_usage(error):
|
|
response = jsonify(error.to_dict())
|
|
response.status_code = error.status_code
|
|
return response
|
|
|
|
|
|
def render_raw_html(template_filename: str, **args) -> str:
|
|
"""Load and render an HTML file _without_ using Flask
|
|
Necessary for 503 error mesage, when DB is down and Flask may be broken.
|
|
"""
|
|
template_path = os.path.join(
|
|
current_app.config["SCODOC_DIR"],
|
|
"app",
|
|
current_app.template_folder,
|
|
template_filename,
|
|
)
|
|
with open(template_path) as f:
|
|
txt = f.read().format(**args)
|
|
return txt
|
|
|
|
|
|
def postgresql_server_error(e):
|
|
"""Erreur de connection au serveur postgresql (voir notesdb.open_db_connection)"""
|
|
return render_raw_html("error_503.html", SCOVERSION=sco_version.SCOVERSION), 503
|
|
|
|
|
|
class LogRequestFormatter(logging.Formatter):
|
|
"""Ajoute URL et remote_addr for logging"""
|
|
|
|
def format(self, record):
|
|
if has_request_context():
|
|
record.url = request.url
|
|
record.remote_addr = request.remote_addr
|
|
else:
|
|
record.url = None
|
|
record.remote_addr = None
|
|
record.sco_user = current_user
|
|
if has_request_context():
|
|
record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"]
|
|
else:
|
|
record.sco_admin_mail = "(pas de requête)"
|
|
|
|
return super().format(record)
|
|
|
|
|
|
class LogExceptionFormatter(logging.Formatter):
|
|
"""Formatteur pour les exceptions: ajoute détails"""
|
|
|
|
def format(self, record):
|
|
if has_request_context():
|
|
record.url = request.url
|
|
record.remote_addr = request.environ.get(
|
|
"HTTP_X_FORWARDED_FOR", request.remote_addr
|
|
)
|
|
record.http_referrer = request.referrer
|
|
record.http_method = request.method
|
|
if request.method == "GET":
|
|
record.http_params = str(request.args)
|
|
else:
|
|
# rep = reprlib.Repr() # abbrège
|
|
record.http_params = str(request.form)[:2048]
|
|
else:
|
|
record.url = None
|
|
record.remote_addr = None
|
|
record.http_referrer = None
|
|
record.http_method = None
|
|
record.http_params = None
|
|
record.sco_user = current_user
|
|
|
|
if has_request_context():
|
|
record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"]
|
|
else:
|
|
record.sco_admin_mail = "(pas de requête)"
|
|
return super().format(record)
|
|
|
|
|
|
class ScoSMTPHandler(SMTPHandler):
|
|
def getSubject(self, record: logging.LogRecord) -> str:
|
|
stack_summary = traceback.extract_tb(record.exc_info[2])
|
|
frame_summary = stack_summary[-1]
|
|
subject = f"ScoExc({sco_version.SCOVERSION}): {record.exc_info[0].__name__} in {frame_summary.name} {frame_summary.filename}"
|
|
|
|
return subject
|
|
|
|
|
|
class ReverseProxied(object):
|
|
"""Adaptateur wsgi qui nous permet d'avoir toutes les URL calculées en https
|
|
sauf quand on est en dev.
|
|
La variable HTTP_X_FORWARDED_PROTO est positionnée par notre config nginx"""
|
|
|
|
def __init__(self, app):
|
|
self.app = app
|
|
|
|
def __call__(self, environ, start_response):
|
|
scheme = environ.get("HTTP_X_FORWARDED_PROTO")
|
|
if scheme:
|
|
environ["wsgi.url_scheme"] = scheme # ou forcer à https ici ?
|
|
return self.app(environ, start_response)
|
|
|
|
|
|
def create_app(config_class=DevConfig):
|
|
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
|
|
app.wsgi_app = ReverseProxied(app.wsgi_app)
|
|
app.logger.setLevel(logging.INFO)
|
|
|
|
# Evite de logguer toutes les requetes dans notre log
|
|
logging.getLogger("werkzeug").disabled = True
|
|
|
|
app.config.from_object(config_class)
|
|
|
|
# Vérifie/crée lien sym pour les URL statiques
|
|
link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}"
|
|
if not os.path.exists(link_filename):
|
|
app.logger.info(f"creating symlink {link_filename}")
|
|
os.symlink("..", link_filename)
|
|
|
|
db.init_app(app)
|
|
migrate.init_app(app, db)
|
|
login.init_app(app)
|
|
mail.init_app(app)
|
|
bootstrap.init_app(app)
|
|
moment.init_app(app)
|
|
cache.init_app(app)
|
|
sco_cache.CACHE = cache
|
|
if CACHE_TYPE: # non default
|
|
app.logger.info(f"CACHE_TYPE={CACHE_TYPE}")
|
|
|
|
app.register_error_handler(ScoGenError, handle_sco_value_error)
|
|
app.register_error_handler(ScoValueError, handle_sco_value_error)
|
|
app.register_error_handler(ScoBugCatcher, handle_sco_bug)
|
|
app.register_error_handler(AccessDenied, handle_access_denied)
|
|
app.register_error_handler(500, internal_server_error)
|
|
app.register_error_handler(503, postgresql_server_error)
|
|
app.register_error_handler(APIInvalidParams, handle_invalid_usage)
|
|
|
|
from app.auth import bp as auth_bp
|
|
|
|
app.register_blueprint(auth_bp, url_prefix="/auth")
|
|
|
|
from app.entreprises import bp as entreprises_bp
|
|
|
|
app.register_blueprint(entreprises_bp, url_prefix="/ScoDoc/entreprises")
|
|
|
|
from app.views import scodoc_bp
|
|
from app.views import scolar_bp
|
|
from app.views import notes_bp
|
|
from app.views import users_bp
|
|
from app.views import absences_bp
|
|
from app.api import bp as api_bp
|
|
|
|
# https://scodoc.fr/ScoDoc
|
|
app.register_blueprint(scodoc_bp)
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/...
|
|
app.register_blueprint(scolar_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite")
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/Notes/...
|
|
app.register_blueprint(notes_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Notes")
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/Users/...
|
|
app.register_blueprint(users_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Users")
|
|
# https://scodoc.fr/ScoDoc/RT/Scolarite/Absences/...
|
|
app.register_blueprint(
|
|
absences_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Absences"
|
|
)
|
|
app.register_blueprint(api_bp, url_prefix="/ScoDoc/api")
|
|
scodoc_log_formatter = LogRequestFormatter(
|
|
"[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n"
|
|
"%(levelname)s: %(message)s"
|
|
)
|
|
# les champs additionnels sont définis dans LogRequestFormatter
|
|
scodoc_exc_formatter = LogExceptionFormatter(
|
|
"[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n"
|
|
"%(levelname)s: %(message)s\n"
|
|
"Referrer: %(http_referrer)s\n"
|
|
"Method: %(http_method)s\n"
|
|
"Params: %(http_params)s\n"
|
|
"Admin mail: %(sco_admin_mail)s\n"
|
|
)
|
|
if not app.testing:
|
|
if not app.debug:
|
|
# --- Config logs pour PRODUCTION
|
|
# On supprime le logguer par défaut qui va vers stderr et pollue les logs systemes
|
|
app.logger.removeHandler(default_handler)
|
|
# --- Mail des messages ERROR et CRITICAL
|
|
if app.config["MAIL_SERVER"]:
|
|
auth = None
|
|
if app.config["MAIL_USERNAME"] or app.config["MAIL_PASSWORD"]:
|
|
auth = (app.config["MAIL_USERNAME"], app.config["MAIL_PASSWORD"])
|
|
secure = None
|
|
if app.config["MAIL_USE_TLS"]:
|
|
secure = ()
|
|
host_name = socket.gethostname()
|
|
mail_handler = ScoSMTPHandler(
|
|
mailhost=(app.config["MAIL_SERVER"], app.config["MAIL_PORT"]),
|
|
fromaddr=app.config["SCODOC_MAIL_FROM"],
|
|
toaddrs=["exception@scodoc.org"],
|
|
subject="ScoDoc Exception", # unused see ScoSMTPHandler
|
|
credentials=auth,
|
|
secure=secure,
|
|
)
|
|
mail_handler.setFormatter(scodoc_exc_formatter)
|
|
mail_handler.setLevel(logging.ERROR)
|
|
app.logger.addHandler(mail_handler)
|
|
else:
|
|
# Pour logs en DEV uniquement:
|
|
default_handler.setFormatter(scodoc_log_formatter)
|
|
|
|
# Config logs pour DEV et PRODUCTION
|
|
# Configuration des logs (actifs aussi en mode development)
|
|
# usually /opt/scodoc-data/log/scodoc.log:
|
|
# rotated by logrotate
|
|
file_handler = WatchedFileHandler(
|
|
app.config["SCODOC_LOG_FILE"], encoding="utf-8"
|
|
)
|
|
file_handler.setFormatter(scodoc_log_formatter)
|
|
file_handler.setLevel(logging.INFO)
|
|
app.logger.addHandler(file_handler)
|
|
# Log pour les erreurs (exceptions) uniquement:
|
|
# usually /opt/scodoc-data/log/scodoc_exc.log
|
|
file_handler = WatchedFileHandler(
|
|
app.config["SCODOC_ERR_FILE"], encoding="utf-8"
|
|
)
|
|
file_handler.setFormatter(scodoc_exc_formatter)
|
|
file_handler.setLevel(logging.ERROR)
|
|
app.logger.addHandler(file_handler)
|
|
|
|
# app.logger.setLevel(logging.INFO)
|
|
app.logger.info(f"{sco_version.SCONAME} {sco_version.SCOVERSION} startup")
|
|
app.logger.info(
|
|
f"create_app({config_class.__name__}, {config_class.SQLALCHEMY_DATABASE_URI})"
|
|
)
|
|
# ---- INITIALISATION SPECIFIQUES A SCODOC
|
|
from app.scodoc import sco_bulletins_generator
|
|
|
|
from app.scodoc.sco_bulletins_legacy import BulletinGeneratorLegacy
|
|
from app.scodoc.sco_bulletins_standard import BulletinGeneratorStandard
|
|
from app.but.bulletin_but_pdf import BulletinGeneratorStandardBUT
|
|
from app.scodoc.sco_bulletins_ucac import BulletinGeneratorUCAC
|
|
|
|
# l'ordre est important, le premier sera le "défaut" pour les nouveaux départements.
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandard)
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandardBUT)
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorLegacy)
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorUCAC)
|
|
if app.testing or app.debug:
|
|
from app.scodoc.sco_bulletins_example import BulletinGeneratorExample
|
|
|
|
sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample)
|
|
|
|
return app
|
|
|
|
|
|
def set_sco_dept(scodoc_dept: str):
|
|
"""Set global g object to given dept and open db connection if needed"""
|
|
# Check that dept exists
|
|
try:
|
|
dept = Departement.query.filter_by(acronym=scodoc_dept).first()
|
|
except sqlalchemy.exc.OperationalError:
|
|
abort(503)
|
|
if not dept:
|
|
raise ScoValueError(f"Invalid dept: {scodoc_dept}")
|
|
g.scodoc_dept = scodoc_dept # l'acronyme
|
|
g.scodoc_dept_id = dept.id # l'id
|
|
if not hasattr(g, "db_conn"):
|
|
ndb.open_db_connection()
|
|
if not hasattr(g, "stored_get_formsemestre"):
|
|
g.stored_get_formsemestre = {}
|
|
|
|
|
|
def user_db_init():
|
|
"""Initialize the users database.
|
|
Check that basic roles and admin user exist.
|
|
"""
|
|
from app.auth.models import User, Role
|
|
|
|
current_app.logger.info("Init User's db")
|
|
# Create roles:
|
|
Role.reset_standard_roles_permissions()
|
|
current_app.logger.info("created initial roles")
|
|
# Ensure that admin exists
|
|
admin_mail = current_app.config.get("SCODOC_ADMIN_MAIL")
|
|
if admin_mail:
|
|
admin_user_name = current_app.config["SCODOC_ADMIN_LOGIN"]
|
|
user = User.query.filter_by(user_name=admin_user_name).first()
|
|
if not user:
|
|
user = User(user_name=admin_user_name, email=admin_mail)
|
|
try:
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
current_app.logger.info(
|
|
"created initial admin user, login: {u.user_name}, email: {u.email}".format(
|
|
u=user
|
|
)
|
|
)
|
|
|
|
|
|
def sco_db_insert_constants():
|
|
"""Initialize Sco database: insert some constants (modalités, ...)."""
|
|
from app import models
|
|
|
|
current_app.logger.info("Init Sco db")
|
|
# Modalités:
|
|
models.FormationModalite.insert_modalites()
|
|
|
|
|
|
def initialize_scodoc_database(erase=False, create_all=False):
|
|
"""Initialize the database for unit tests
|
|
Starts from an existing database and create all necessary
|
|
SQL tables and functions.
|
|
If erase is True, _erase_ all database content.
|
|
"""
|
|
from app import models
|
|
|
|
# - ERASE (the truncation sql function has been defined above)
|
|
if erase:
|
|
truncate_database()
|
|
# - Create all tables
|
|
if create_all:
|
|
# managed by migrations, except for TESTS
|
|
db.create_all()
|
|
# - Insert initial roles and create super-admin user
|
|
user_db_init()
|
|
# - Insert some constant values (modalites, ...)
|
|
sco_db_insert_constants()
|
|
# - Flush cache
|
|
clear_scodoc_cache()
|
|
|
|
|
|
def truncate_database():
|
|
"""Erase content of all tables (including users !) from
|
|
the current database.
|
|
"""
|
|
# use a stored SQL function, see createtables.sql
|
|
try:
|
|
db.session.execute("SELECT truncate_tables('scodoc');")
|
|
db.session.commit()
|
|
except:
|
|
db.session.rollback()
|
|
raise
|
|
|
|
|
|
def clear_scodoc_cache():
|
|
"""Clear ScoDoc cache
|
|
This cache (currently Redis) is persistent between invocation
|
|
and it may be necessary to clear it during developement or tests.
|
|
"""
|
|
# attaque directement redis, court-circuite ScoDoc:
|
|
import redis
|
|
|
|
r = redis.Redis()
|
|
r.flushall()
|
|
# Also clear local caches:
|
|
sco_preferences.clear_base_preferences()
|
|
|
|
|
|
# --------- Logging
|
|
def log(msg: str, silent_test=True):
|
|
"""log a message.
|
|
If Flask app, use configured logger, else stderr.
|
|
"""
|
|
if silent_test and current_app and current_app.config["TESTING"]:
|
|
return
|
|
try:
|
|
dept = getattr(g, "scodoc_dept", "")
|
|
msg = f" ({dept}) {msg}"
|
|
except RuntimeError:
|
|
# Flask Working outside of application context.
|
|
pass
|
|
|
|
if current_app:
|
|
current_app.logger.info(msg)
|
|
else:
|
|
sys.stdout.flush()
|
|
sys.stderr.write(
|
|
"[%s] scodoc: %s\n" % (time.strftime("%a %b %d %H:%M:%S %Y"), msg)
|
|
)
|
|
sys.stderr.flush()
|
|
|
|
|
|
# Debug: log call stack
|
|
def log_call_stack():
|
|
log("Call stack:\n" + "\n".join(x.strip() for x in traceback.format_stack()[:-1]))
|
|
|
|
|
|
# Alarms by email:
|
|
def send_scodoc_alarm(subject, txt):
|
|
from app.scodoc import sco_preferences
|
|
from app import email
|
|
|
|
sender = sco_preferences.get_preference("email_from_addr")
|
|
email.send_email(subject, sender, ["exception@scodoc.org"], txt)
|
|
|
|
|
|
from app.models import Departement
|
|
from app.scodoc import notesdb as ndb, sco_preferences
|
|
from app.scodoc import sco_cache
|
|
|
|
|
|
def scodoc_flash_status_messages():
|
|
"""Should be called on each page: flash messages indicating specific ScoDoc status"""
|
|
email_test_mode_address = sco_preferences.get_preference("email_test_mode_address")
|
|
if email_test_mode_address:
|
|
flash(
|
|
f"Mode test: mails redirigés vers {email_test_mode_address}",
|
|
category="warning",
|
|
)
|