From 58a3b56e3780d8e3822d7d21005ea1f5bae81206 Mon Sep 17 00:00:00 2001 From: Emmanuel Viennet Date: Mon, 5 Aug 2024 15:42:59 +0200 Subject: [PATCH] WIP: Celery --- app/__init__.py | 26 ++++++++++++++++++++++++- app/views/scodoc.py | 21 +++++++++++++++++++++ config.py | 8 ++++++++ requirements-3.11.txt | 10 ++++++++++ tasks.py | 44 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 tasks.py diff --git a/app/__init__.py b/app/__init__.py index 97690884..fabcc989 100755 --- a/app/__init__.py +++ b/app/__init__.py @@ -15,6 +15,7 @@ from logging.handlers import SMTPHandler, WatchedFileHandler from threading import Thread import warnings +from celery import Celery, Task from flask import current_app, g, request from flask import Flask from flask import abort, flash, has_request_context @@ -270,13 +271,36 @@ class ReverseProxied(object): return self.app(environ, start_response) -def create_app(config_class=DevConfig): +def celery_init_app(app: Flask) -> Celery: + """Initialize Celery. + See https://flask.palletsprojects.com/en/latest/patterns/celery + """ + + class FlaskTask(Task): + "Task in Flask context" + + def __call__(self, *args: object, **kwargs: object) -> object: + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app + + +def create_app(config_class=DevConfig) -> Flask: + """Create Flask application""" app = Flask(__name__, static_url_path="/ScoDoc/static", static_folder="static") app.config.from_object(config_class) from app.auth import cas CAS(app, url_prefix="/cas", configuration_function=cas.set_cas_configuration) app.wsgi_app = ReverseProxied(app.wsgi_app) + # --- Celery + celery_init_app(app) + # --- JSON app_json = FlaskJSON(app) @app_json.encoder diff --git a/app/views/scodoc.py b/app/views/scodoc.py index f7893132..0a6ee9e3 100644 --- a/app/views/scodoc.py +++ b/app/views/scodoc.py @@ -746,3 +746,24 @@ def ue_colors_css(formation_id: int, semestre_idx: int): # current_app.logger.critical(f"testlog message CRITICAL") # raise SyntaxError("une erreur de syntaxe") # return "testlog completed at " + str(time.time()) + + +@bp.route("/example_task_run/") +def run_example_task(duration: int): + """Lance tâche de fond + http://deb12.viennet.net:5000/example_task_run/60 + """ + from tasks import sco_example_task + + task = sco_example_task.apply_async(duration=duration) + return redirect(url_for("scodoc.example_task_status", task_id=task.id)) + + +@bp.route("/example_task_status/") +def example_task_status(task_id): + """Status tâche de fond""" + from tasks import sco_example_task + + task = sco_example_task.AsyncResult(task_id) + + return f"Task {task_id}: state={task.state} {task.info.get('current', '?')} / {task.info.get('total_duration', '?')} ({task.info.get('status', '?')})" diff --git a/config.py b/config.py index d98e9513..23d03b15 100755 --- a/config.py +++ b/config.py @@ -43,6 +43,14 @@ class Config: JSON_USE_ENCODE_METHODS = True JSON_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z" # "%Y-%m-%dT%H:%M:%S" + # Celery + CELERY = { + "broker_connection_retry_on_startup": True, + "broker_url": os.environ.get("CELERY_BROKER_URL") or "redis://localhost:6379/0", + "result_backend": os.environ.get("CELERY_RESULT_BACKEND") + or "redis://localhost:6379/0", + } + class ProdConfig(Config): "mode production, normalement derrière nginx/gunicorn" diff --git a/requirements-3.11.txt b/requirements-3.11.txt index 825c1635..a684f203 100644 --- a/requirements-3.11.txt +++ b/requirements-3.11.txt @@ -1,17 +1,23 @@ alembic==1.13.2 +amqp==5.2.0 astroid==3.2.4 async-timeout==4.0.3 attrs==23.2.0 Babel==2.15.0 +billiard==4.2.0 black==24.4.2 blinker==1.8.2 Brotli==1.1.0 cachelib==0.9.0 +celery==5.4.0 certifi==2024.7.4 cffi==1.16.0 chardet==5.2.0 charset-normalizer==3.3.2 click==8.1.7 +click-didyoumean==0.3.1 +click-plugins==1.1.1 +click-repl==0.3.0 cracklib==2.9.6 cryptography==43.0.0 cssselect2==0.7.0 @@ -48,6 +54,7 @@ iniconfig==2.0.0 isort==5.13.2 itsdangerous==2.2.0 Jinja2==3.1.4 +kombu==5.3.7 lazy-object-proxy==1.10.0 lxml==5.2.2 Mako==1.3.5 @@ -64,6 +71,7 @@ pathspec==0.12.1 pillow==10.4.0 platformdirs==4.2.2 pluggy==1.5.0 +prompt_toolkit==3.0.47 psycopg2==2.9.9 puremagic==1.26 py==1.11.0 @@ -105,7 +113,9 @@ tuna==0.5.11 typing_extensions==4.12.2 tzdata==2024.1 urllib3==2.2.2 +vine==5.1.0 visitor==0.1.3 +wcwidth==0.2.13 weasyprint==62.3 webencodings==0.5.1 Werkzeug==3.0.3 diff --git a/tasks.py b/tasks.py new file mode 100644 index 00000000..0f245b6f --- /dev/null +++ b/tasks.py @@ -0,0 +1,44 @@ +"""Essai Celery""" + +import time +from celery import shared_task + +from app import create_app + +flask_app = create_app() # -Line 2 +celery_app = flask_app.extensions["celery"] # -Line 3 + + +@shared_task(ignore_result=False) # -Line 4 +def long_running_task(iterations) -> int: # -Line 5 + result = 0 + for i in range(iterations): + result += i + time.sleep(2) + return result # -Line 6 + + +@shared_task(ignore_result=False, bind=True) +def sco_example_task(self, duration: int = 10): + """Example task to run in background. + + Args: + duration (int): duration in seconds + + Returns: + int: result + """ + for i in range(duration): + self.update_state( + state="PROGRESS", + meta={"current": i, "total_duration": duration, "status": "running"}, + ) + time.sleep(1) + + return { + "current": 100, + "total": 100, + "status": "Task completed!", + "total_duration": duration, + "result": duration, + }