# -*- coding: UTF-8 -* # pylint: disable=invalid-name import base64 import datetime import json import os import socket import sys import time import traceback import logging from logging.handlers import SMTPHandler, WatchedFileHandler from threading import Thread import warnings from flask import current_app, g, request from flask import Flask from flask import abort, flash, has_request_context from flask import render_template from flask.logging import default_handler from flask_caching import Cache from flask_json import FlaskJSON, json_response from flask_login import LoginManager, current_user from flask_mail import Mail from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from jinja2 import select_autoescape import numpy as np import psycopg2 from psycopg2.extensions import AsIs as psycopg2_AsIs import sqlalchemy as sa import werkzeug.debug from wtforms.fields import HiddenField from flask_cas import CAS from app.scodoc.sco_exceptions import ( AccessDenied, ScoBugCatcher, ScoException, ScoGenError, ScoInvalidCSRF, ScoPDFFormatError, ScoValueError, APIInvalidParams, ) from app.scodoc.sco_vdi import ApoEtapeVDI from config import DevConfig import sco_version db = SQLAlchemy() migrate = Migrate(compare_type=True) login = LoginManager() login.login_view = "auth.login" login.login_message = "Identifiez-vous pour accéder à cette page." mail = Mail() CACHE_TYPE = os.environ.get("CACHE_TYPE") cache = Cache( config={ # see https://flask-caching.readthedocs.io/en/latest/index.html#configuring-flask-caching "CACHE_TYPE": CACHE_TYPE or "RedisCache", # by default, never expire: "CACHE_DEFAULT_TIMEOUT": os.environ.get("CACHE_DEFAULT_TIMEOUT") or 0, } ) # NumPy & Psycopg2 (necessary with Numpy 2.0) # probablement à changer quand on passera à psycopg3.2 def adapt_numpy_scalar(numpy_scalar): """Adapt numeric types for psycopg2""" return psycopg2_AsIs(numpy_scalar if not np.isnan(numpy_scalar) else "'NaN'") psycopg2.extensions.register_adapter(np.float32, adapt_numpy_scalar) psycopg2.extensions.register_adapter(np.float64, adapt_numpy_scalar) psycopg2.extensions.register_adapter(np.int32, adapt_numpy_scalar) psycopg2.extensions.register_adapter(np.int64, adapt_numpy_scalar) def handle_sco_value_error(exc): "page d'erreur avec message" return render_template("sco_value_error.j2", exc=exc), 404 def handle_access_denied(exc): return render_template("error_access_denied.j2", exc=exc), 403 def handle_invalid_csrf(exc): """Form submit with invalid CSRF token""" # logout user and go back to login page with an error message from app import auth auth.logic.logout() return render_template("error_csrf.j2", exc=exc), 404 # def handle_pdf_format_error(exc): # return "ay ay ay" handle_pdf_format_error = handle_sco_value_error def internal_server_error(exc): """Bugs scodoc, erreurs 500""" # note that we set the 500 status explicitly from app.scodoc import sco_utils as scu # Invalide tous les caches log("internal_server_error: clearing caches") clear_scodoc_cache() return ( render_template( "error_500.j2", SCOVERSION=sco_version.SCOVERSION, date=datetime.datetime.now().isoformat(), exc=exc, traceback_str_base64=base64.urlsafe_b64encode( traceback.format_exc().encode(scu.SCO_ENCODING) ).decode(scu.SCO_ENCODING), request_url=request.url, scu=scu, ), 500, ) def handle_sco_bug(exc): """Un bug, en général rare, sur lequel les dev cherchent des informations pour le corriger. """ if current_app.config["TESTING"] or current_app.config["DEBUG"]: raise ScoException # for development servers only else: Thread( target=_async_dump, args=(current_app._get_current_object(), request.url) ).start() return internal_server_error(exc) def _async_dump(app, request_url: str): from app.scodoc.sco_dump_db import sco_dump_and_send_db with app.app_context(): ndb.open_db_connection() try: sco_dump_and_send_db("ScoBugCatcher", request_url=request_url) except ScoValueError: pass def handle_invalid_usage(error): response = json_response(data_=error.to_dict()) response.status_code = error.status_code return response # JSON ENCODING # used by some internal finctions # the API is now using flask_json, NOT THIS ENCODER class ScoDocJSONEncoder(json.JSONEncoder): def default(self, o): # pylint: disable=E0202 if isinstance(o, (datetime.date, datetime.datetime)): return o.isoformat() elif isinstance(o, ApoEtapeVDI): return str(o) else: return json.JSONEncoder.default(self, o) def render_raw_html(template_filename: str, **args) -> str: """Load and render an HTML file _without_ using Flask Necessary for 503 error message, when DB is down and Flask may be broken. """ template_path = os.path.join( current_app.config["SCODOC_DIR"], "app", current_app.template_folder, template_filename, ) with open(template_path) as f: txt = f.read().format(**args) return txt def postgresql_server_error(e): """Erreur de connection au serveur postgresql (voir notesdb.open_db_connection)""" return render_raw_html("error_503.j2", SCOVERSION=sco_version.SCOVERSION), 503 class LogRequestFormatter(logging.Formatter): """Ajoute URL et remote_addr for logging""" def format(self, record): if has_request_context(): record.url = request.url record.remote_addr = request.remote_addr else: record.url = None record.remote_addr = None record.sco_user = current_user if has_request_context(): record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"] else: record.sco_admin_mail = "(pas de requête)" return super().format(record) class LogExceptionFormatter(logging.Formatter): """Formatteur pour les exceptions: ajoute détails""" def format(self, record): if has_request_context(): record.url = request.url record.remote_addr = request.environ.get( "HTTP_X_FORWARDED_FOR", request.remote_addr ) record.http_referrer = request.referrer record.http_method = request.method if request.method == "GET": record.http_params = str(request.args) else: # rep = reprlib.Repr() # abbrège record.http_params = str(request.form)[:2048] else: record.url = None record.remote_addr = None record.http_referrer = None record.http_method = None record.http_params = None record.sco_user = current_user if has_request_context(): record.sco_admin_mail = current_app.config["SCODOC_ADMIN_MAIL"] else: record.sco_admin_mail = "(pas de requête)" return super().format(record) class ScoSMTPHandler(SMTPHandler): def getSubject(self, record: logging.LogRecord) -> str: stack_summary = traceback.extract_tb(record.exc_info[2]) frame_summary = stack_summary[-1] subject = f"ScoExc({sco_version.SCOVERSION}): {record.exc_info[0].__name__} in {frame_summary.name} {frame_summary.filename}" return subject class ReverseProxied(object): """Adaptateur wsgi qui nous permet d'avoir toutes les URL calculées en https sauf quand on est en dev. La variable HTTP_X_FORWARDED_PROTO est positionnée par notre config nginx""" def __init__(self, app): self.app = app def __call__(self, environ, start_response): scheme = environ.get("HTTP_X_FORWARDED_PROTO") if scheme: environ["wsgi.url_scheme"] = scheme # ou forcer à https ici ? return self.app(environ, start_response) def create_app(config_class=DevConfig): app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static") app.config.from_object(config_class) from app.auth import cas CAS(app, url_prefix="/cas", configuration_function=cas.set_cas_configuration) app.wsgi_app = ReverseProxied(app.wsgi_app) app_json = FlaskJSON(app) @app_json.encoder def scodoc_json_encoder(o): "Overide default date encoding (RFC 822) and use ISO" if isinstance(o, (datetime.date, datetime.datetime)): return o.isoformat() # Pour conserver l'ordre des objets dans les JSON: # e.g. l'ordre des UE dans les bulletins app.json.sort_keys = False # Evite de logguer toutes les requetes dans notre log logging.getLogger("werkzeug").disabled = True app.logger.setLevel(app.config["LOG_LEVEL"]) if app.config["TESTING"] or app.config["DEBUG"]: # S'arrête sur tous les warnings, sauf # flask_sqlalchemy/query (pb deprecation du model.get()) warnings.filterwarnings("error", module="flask_sqlalchemy/query") # warnings.filterwarnings("ignore", module="json/provider.py") xxx sans effet en test if app.config["DEBUG"]: # comme on a désactivé ci-dessus les logs de werkzeug, # on affiche nous même le PIN en mode debug: print( f""" * Debugger is active! * Debugger PIN: {werkzeug.debug.get_pin_and_cookie_name(app)[0]} """ ) # Vérifie/crée lien sym pour les URL statiques link_filename = f"{app.root_path}/static/links/{sco_version.SCOVERSION}" if not os.path.exists(link_filename): app.logger.info(f"creating symlink {link_filename}") os.symlink("..", link_filename) db.init_app(app) migrate.init_app(app, db) login.init_app(app) mail.init_app(app) app.extensions["mail"].debug = 0 # disable copy of mails to stderr cache.init_app(app) sco_cache.CACHE = cache if CACHE_TYPE: # non default app.logger.info(f"CACHE_TYPE={CACHE_TYPE}") app.register_error_handler(ScoGenError, handle_sco_value_error) app.register_error_handler(ScoValueError, handle_sco_value_error) app.register_error_handler(ScoBugCatcher, handle_sco_bug) app.register_error_handler(ScoInvalidCSRF, handle_invalid_csrf) app.register_error_handler(ScoPDFFormatError, handle_pdf_format_error) app.register_error_handler(AccessDenied, handle_access_denied) app.register_error_handler(500, internal_server_error) app.register_error_handler(503, postgresql_server_error) app.register_error_handler(APIInvalidParams, handle_invalid_usage) from app.auth import bp as auth_bp app.register_blueprint(auth_bp, url_prefix="/auth") from app.entreprises import bp as entreprises_bp app.register_blueprint(entreprises_bp, url_prefix="/ScoDoc/entreprises") from app.views import scodoc_bp from app.views import scolar_bp from app.views import notes_bp from app.views import users_bp from app.views import absences_bp from app.views import assiduites_bp from app.api import api_bp from app.api import api_web_bp # Jinja2 configuration # Enable autoescaping of all templates, including .j2 app.jinja_env.autoescape = select_autoescape(default_for_string=True, default=True) app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True app.jinja_env.globals["is_hidden_field"] = lambda field: isinstance( field, HiddenField ) # https://scodoc.fr/ScoDoc app.register_blueprint(scodoc_bp) # https://scodoc.fr/ScoDoc/RT/Scolarite/... app.register_blueprint(scolar_bp, url_prefix="/ScoDoc//Scolarite") # https://scodoc.fr/ScoDoc/RT/Scolarite/Notes/... app.register_blueprint(notes_bp, url_prefix="/ScoDoc//Scolarite/Notes") # https://scodoc.fr/ScoDoc/RT/Scolarite/Users/... app.register_blueprint(users_bp, url_prefix="/ScoDoc//Scolarite/Users") # https://scodoc.fr/ScoDoc/RT/Scolarite/Absences/... app.register_blueprint( absences_bp, url_prefix="/ScoDoc//Scolarite/Absences" ) app.register_blueprint( assiduites_bp, url_prefix="/ScoDoc//Scolarite/Assiduites" ) app.register_blueprint(api_bp, url_prefix="/ScoDoc/api") app.register_blueprint(api_web_bp, url_prefix="/ScoDoc//api") scodoc_log_formatter = LogRequestFormatter( "[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n" "%(levelname)s: %(message)s" ) # les champs additionnels sont définis dans LogRequestFormatter scodoc_exc_formatter = LogExceptionFormatter( "[%(asctime)s] %(sco_user)s@%(remote_addr)s requested %(url)s\n" "%(levelname)s: %(message)s\n" "Referrer: %(http_referrer)s\n" "Method: %(http_method)s\n" "Params: %(http_params)s\n" "Admin mail: %(sco_admin_mail)s\n" ) if not app.testing: if not app.debug: # --- Config logs pour PRODUCTION # On supprime le logguer par défaut qui va vers stderr et pollue les logs systemes app.logger.removeHandler(default_handler) # --- Mail des messages ERROR et CRITICAL if app.config["MAIL_SERVER"]: auth = None if app.config["MAIL_USERNAME"] or app.config["MAIL_PASSWORD"]: auth = (app.config["MAIL_USERNAME"], app.config["MAIL_PASSWORD"]) secure = None if app.config["MAIL_USE_TLS"]: secure = () host_name = socket.gethostname() mail_handler = ScoSMTPHandler( mailhost=(app.config["MAIL_SERVER"], app.config["MAIL_PORT"]), fromaddr=app.config["SCODOC_MAIL_FROM"], toaddrs=["exception@scodoc.org"], subject="ScoDoc Exception", # unused see ScoSMTPHandler credentials=auth, secure=secure, ) mail_handler.setFormatter(scodoc_exc_formatter) mail_handler.setLevel(logging.ERROR) app.logger.addHandler(mail_handler) else: # Pour logs en DEV uniquement: default_handler.setFormatter(scodoc_log_formatter) # Config logs pour DEV et PRODUCTION # Configuration des logs (actifs aussi en mode development) # usually /opt/scodoc-data/log/scodoc.log: # rotated by logrotate file_handler = WatchedFileHandler( app.config["SCODOC_LOG_FILE"], encoding="utf-8" ) file_handler.setFormatter(scodoc_log_formatter) file_handler.setLevel(logging.INFO) app.logger.addHandler(file_handler) # Log pour les erreurs (exceptions) uniquement: # usually /opt/scodoc-data/log/scodoc_exc.log file_handler = WatchedFileHandler( app.config["SCODOC_ERR_FILE"], encoding="utf-8" ) file_handler.setFormatter(scodoc_exc_formatter) file_handler.setLevel(logging.ERROR) app.logger.addHandler(file_handler) # app.logger.setLevel(logging.INFO) app.logger.info(f"{sco_version.SCONAME} {sco_version.SCOVERSION} startup") app.logger.info( f"create_app({config_class.__name__}, {config_class.SQLALCHEMY_DATABASE_URI})" ) # ---- INITIALISATION SPECIFIQUES A SCODOC from app.scodoc import sco_bulletins_generator from app.scodoc.sco_bulletins_legacy import BulletinGeneratorLegacy from app.scodoc.sco_bulletins_standard import BulletinGeneratorStandard from app.but.bulletin_but_pdf import BulletinGeneratorStandardBUT from app.scodoc.sco_bulletins_ucac import BulletinGeneratorUCAC # l'ordre est important, le premier sera le "défaut" pour les nouveaux départements. sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandard) sco_bulletins_generator.register_bulletin_class(BulletinGeneratorStandardBUT) sco_bulletins_generator.register_bulletin_class(BulletinGeneratorLegacy) sco_bulletins_generator.register_bulletin_class(BulletinGeneratorUCAC) if app.testing or app.debug: from app.scodoc.sco_bulletins_example import BulletinGeneratorExample sco_bulletins_generator.register_bulletin_class(BulletinGeneratorExample) from app.auth.cas import set_cas_configuration with app.app_context(): try: set_cas_configuration(app) except sa.exc.ProgrammingError: # Si la base n'a pas été upgradée (arrive durrant l'install) # il se peut que la table scodoc_site_config n'existe pas encore. pass return app def set_sco_dept(scodoc_dept: str, open_cnx=True): """Set global g object to given dept and open db connection if needed""" # Check that dept exists try: dept = Departement.query.filter_by(acronym=scodoc_dept).first() except sa.exc.OperationalError: abort(503) if not dept: raise ScoValueError(f"Invalid dept: {scodoc_dept}") g.scodoc_dept = scodoc_dept # l'acronyme g.scodoc_dept_id = dept.id # l'id if open_cnx and not hasattr(g, "db_conn"): ndb.open_db_connection() if not hasattr(g, "stored_get_formsemestre"): g.stored_get_formsemestre = {} def user_db_init(): """Initialize the users database. Check that basic roles and admin user exist. """ from app.auth.models import User, Role current_app.logger.info("Init User's db") # Create roles: Role.reset_standard_roles_permissions() current_app.logger.info("created initial roles") # Ensure that admin exists admin_mail = current_app.config.get("SCODOC_ADMIN_MAIL") if admin_mail: admin_user_name = current_app.config["SCODOC_ADMIN_LOGIN"] user = User.query.filter_by(user_name=admin_user_name).first() if not user: user = User(user_name=admin_user_name, email=admin_mail) try: db.session.add(user) db.session.commit() except: db.session.rollback() raise current_app.logger.info( "created initial admin user, login: {u.user_name}, email: {u.email}".format( u=user ) ) def sco_db_insert_constants(): """Initialize Sco database: insert some constants (modalités, ...).""" from app import models current_app.logger.info("Init Sco db") # Modalités: models.FormationModalite.insert_modalites() def initialize_scodoc_database(erase=False, create_all=False): """Initialize the database for unit tests Starts from an existing database and create all necessary SQL tables and functions. If erase is True, _erase_ all database content. """ # - ERASE (the truncation sql function has been defined above) if erase: truncate_database() # - Create all tables if create_all: # managed by migrations, except for TESTS db.create_all() # - Insert initial roles and create super-admin user user_db_init() # - Insert some constant values (modalites, ...) sco_db_insert_constants() # - Flush cache clear_scodoc_cache() def truncate_database(): """Erase content of all tables (including users !) from the current database. """ # use a stored SQL function, see createtables.sql try: db.session.execute(sa.text("SELECT truncate_tables('scodoc');")) db.session.commit() except: db.session.rollback() raise # Remet les compteurs (séquences sql) à zéro db.session.execute( sa.text( """ CREATE OR REPLACE FUNCTION reset_sequences(username IN VARCHAR) RETURNS void AS $$ DECLARE statements CURSOR FOR SELECT sequence_name FROM information_schema.sequences ORDER BY sequence_name ; BEGIN FOR stmt IN statements LOOP EXECUTE 'ALTER SEQUENCE ' || quote_ident(stmt.sequence_name) || ' RESTART;'; END LOOP; END; $$ LANGUAGE plpgsql; SELECT reset_sequences('scodoc'); """ ) ) db.session.commit() def clear_scodoc_cache(): """Clear ScoDoc cache This cache (currently Redis) is persistent between invocation and it may be necessary to clear it during developement or tests. """ # attaque directement redis, court-circuite ScoDoc: import redis r = redis.Redis() r.flushall() # Also clear local caches: sco_preferences.clear_base_preferences() # --------- Logging def log(msg: str): """log a message. If Flask app, use configured logger, else stderr. """ try: dept = getattr(g, "scodoc_dept", "") msg = f" ({dept}) {msg}" except RuntimeError: # Flask Working outside of application context. pass if current_app and not current_app.config["DEBUG"]: current_app.logger.info(msg) else: sys.stdout.flush() sys.stderr.write( f"""[{time.strftime("%a %b %d %H:%M:%S %Y")}] scodoc: {msg}\n""" ) sys.stderr.flush() # Debug: log call stack def log_call_stack(): log("Call stack:\n" + "\n".join(x.strip() for x in traceback.format_stack()[:-1])) # Alarms by email: def send_scodoc_alarm(subject, txt): from app import email sender = email.get_from_addr() email.send_email(subject, sender, ["exception@scodoc.org"], txt) from app.models import Departement from app.scodoc import notesdb as ndb, sco_preferences from app.scodoc import sco_cache def scodoc_flash_status_messages(): """Should be called on each page: flash messages indicating specific ScoDoc status""" email_test_mode_address = sco_preferences.get_preference("email_test_mode_address") if email_test_mode_address: flash( f"Mode test: mails redirigés vers {email_test_mode_address}", category="warning", ) def critical_error(msg): """Handle a critical error: flush all caches, display message to the user""" import app.scodoc.sco_utils as scu log(f"\n*** CRITICAL ERROR: {msg}") subject = f"CRITICAL ERROR: {msg}".strip()[:68] send_scodoc_alarm(subject, msg) clear_scodoc_cache() raise ScoValueError( f""" Une erreur est survenue, veuillez ré-essayer. {msg} """ )