ScoDoc/tasks.py
2024-08-05 15:42:59 +02:00

45 lines
1003 B
Python

"""Essai Celery"""
import time
from celery import shared_task
from app import create_app
flask_app = create_app() # -Line 2
celery_app = flask_app.extensions["celery"] # -Line 3
@shared_task(ignore_result=False) # -Line 4
def long_running_task(iterations) -> int: # -Line 5
result = 0
for i in range(iterations):
result += i
time.sleep(2)
return result # -Line 6
@shared_task(ignore_result=False, bind=True)
def sco_example_task(self, duration: int = 10):
"""Example task to run in background.
Args:
duration (int): duration in seconds
Returns:
int: result
"""
for i in range(duration):
self.update_state(
state="PROGRESS",
meta={"current": i, "total_duration": duration, "status": "running"},
)
time.sleep(1)
return {
"current": 100,
"total": 100,
"status": "Task completed!",
"total_duration": duration,
"result": duration,
}