ScoDoc-PE/app/__init__.py

242 lines
7.4 KiB
Python

# -*- coding: UTF-8 -*
# pylint: disable=invalid-name
from app.scodoc.sco_exceptions import ScoValueError
import os
import sys
import logging
from logging.handlers import SMTPHandler, TimedRotatingFileHandler
from flask import request
from flask import Flask
from flask import current_app
from flask import g
from flask import render_template
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager
from flask_mail import Mail
from flask_bootstrap import Bootstrap
from flask_moment import Moment
from flask_caching import Cache
from config import DevConfig
import sco_version
db = SQLAlchemy()
migrate = Migrate()
login = LoginManager()
login.login_view = "auth.login"
login.login_message = "Please log in to access this page."
mail = Mail()
bootstrap = Bootstrap()
moment = Moment()
cache = Cache( # XXX TODO: configuration file
config={
# "CACHE_TYPE": "MemcachedCache"
"CACHE_TYPE": "RedisCache"
}
)
def handle_sco_value_error(exc):
return render_template("sco_value_error.html", exc=exc), 404
def create_app(config_class=DevConfig):
app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static")
app.logger.setLevel(logging.DEBUG)
app.config.from_object(config_class)
app.logger.info(f"create_app({config_class.__name__})")
db.init_app(app)
migrate.init_app(app, db)
login.init_app(app)
mail.init_app(app)
bootstrap.init_app(app)
moment.init_app(app)
cache.init_app(app)
sco_cache.CACHE = cache
app.register_error_handler(ScoValueError, handle_sco_value_error)
from app.auth import bp as auth_bp
app.register_blueprint(auth_bp, url_prefix="/auth")
from app.views import scodoc_bp
from app.views import scolar_bp
from app.views import notes_bp
from app.views import users_bp
from app.views import absences_bp
# https://scodoc.fr/ScoDoc
app.register_blueprint(scodoc_bp)
# https://scodoc.fr/ScoDoc/RT/Scolarite/...
app.register_blueprint(scolar_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite")
# https://scodoc.fr/ScoDoc/RT/Scolarite/Notes/...
app.register_blueprint(notes_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Notes")
# https://scodoc.fr/ScoDoc/RT/Scolarite/Users/...
app.register_blueprint(users_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Users")
# https://scodoc.fr/ScoDoc/RT/Scolarite/Absences/...
app.register_blueprint(
absences_bp, url_prefix="/ScoDoc/<scodoc_dept>/Scolarite/Absences"
)
if not app.debug and not app.testing:
if app.config["MAIL_SERVER"]:
auth = None
if app.config["MAIL_USERNAME"] or app.config["MAIL_PASSWORD"]:
auth = (app.config["MAIL_USERNAME"], app.config["MAIL_PASSWORD"])
secure = None
if app.config["MAIL_USE_TLS"]:
secure = ()
mail_handler = SMTPHandler(
mailhost=(app.config["MAIL_SERVER"], app.config["MAIL_PORT"]),
fromaddr="no-reply@" + app.config["MAIL_SERVER"],
toaddrs=[app.config["ADMINS"]],
subject="ScoDoc Failure",
credentials=auth,
secure=secure,
)
mail_handler.setLevel(logging.ERROR)
app.logger.addHandler(mail_handler)
if not os.path.exists("logs"):
os.mkdir("logs")
file_handler = TimedRotatingFileHandler(
"/opt/scodoc-data/log/scodoc.log",
when="W6", # Sunday
backupCount=53, # 1 an de logs
)
file_handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s " "[in %(pathname)s:%(lineno)d]"
)
)
file_handler.setLevel(logging.INFO)
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
app.logger.info(f"{sco_version.SCONAME} {sco_version.SCOVERSION} startup")
return app
def set_sco_dept(scodoc_dept: str):
"""Set global g object to given dept and open db connection if needed"""
# Check that dept exists
dept = Departement.query.filter_by(acronym=scodoc_dept).first()
if not dept:
raise ScoValueError(f"Invalid dept: {scodoc_dept}")
g.scodoc_dept = scodoc_dept # l'acronyme
g.scodoc_dept_id = dept.id # l'id
if not hasattr(g, "db_conn"):
ndb.open_db_connection()
def user_db_init():
"""Initialize the users database."""
from app.auth.models import User, Role
current_app.logger.info("Init User's db")
# Create roles:
Role.insert_roles()
current_app.logger.info("created initial roles")
# Ensure that admin exists
admin_mail = current_app.config.get("SCODOC_ADMIN_MAIL")
if admin_mail:
admin_user_name = current_app.config["SCODOC_ADMIN_LOGIN"]
user = User.query.filter_by(user_name=admin_user_name).first()
if not user:
user = User(user_name=admin_user_name, email=admin_mail)
try:
db.session.add(user)
db.session.commit()
except:
db.session.rollback()
raise
current_app.logger.info(
"created initial admin user, login: {u.user_name}, email: {u.email}".format(
u=user
)
)
def sco_db_init():
"""Initialize Sco database"""
from app import models
current_app.logger.info("Init Sco db")
# Modalités:
models.NotesFormModalite.insert_modalites()
def initialize_scodoc_database(erase=False):
"""Initialize the database.
Starts from an existing database and create all necessary
SQL tables and functions.
If erase is True, _erase_ all database content.
"""
from app import models
# - our specific functions and sequences, not generated by SQLAlchemy
models.create_database_functions()
# - ERASE (the truncation sql function has been defined above)
if erase:
truncate_database()
# - Create all tables
db.create_all()
# - Insert initial roles and create super-admin user
user_db_init()
# - Insert some constant values (modalites, ...)
sco_db_init()
# - Flush cache
clear_scodoc_cache()
def truncate_database():
"""Erase content of all tables (including users !) from
the current database.
"""
# use a stored SQL function, see createtables.sql
try:
db.session.execute("SELECT truncate_tables('scodoc');")
db.session.commit()
except:
db.session.rollback()
raise
def clear_scodoc_cache():
"""Clear ScoDoc cache
This cache (currently Redis) is persistent between invocation
and it may be necessary to clear it during developement or tests.
"""
# attaque directement redis, court-circuite ScoDoc:
import redis
r = redis.Redis()
r.flushall()
# Also clear local caches:
sco_preferences.clear_base_preferences()
from app.models import Departement
from app.scodoc import notesdb as ndb, sco_preferences
from app.scodoc import sco_cache
# admin_role = Role.query.filter_by(name="SuperAdmin").first()
# if admin_role:
# admin = (
# User.query.join(UserRole)
# .filter((UserRole.user_id == User.id) & (UserRole.role_id == admin_role.id))
# .first()
# )
# else:
# click.echo(
# "Warning: user database not initialized !\n (use: flask user-db-init)"
# )
# admin = None