forked from ScoDoc/ScoDoc
45 lines
1003 B
Python
45 lines
1003 B
Python
"""Essai Celery"""
|
|
|
|
import time
|
|
from celery import shared_task
|
|
|
|
from app import create_app
|
|
|
|
flask_app = create_app() # -Line 2
|
|
celery_app = flask_app.extensions["celery"] # -Line 3
|
|
|
|
|
|
@shared_task(ignore_result=False) # -Line 4
|
|
def long_running_task(iterations) -> int: # -Line 5
|
|
result = 0
|
|
for i in range(iterations):
|
|
result += i
|
|
time.sleep(2)
|
|
return result # -Line 6
|
|
|
|
|
|
@shared_task(ignore_result=False, bind=True)
|
|
def sco_example_task(self, duration: int = 10):
|
|
"""Example task to run in background.
|
|
|
|
Args:
|
|
duration (int): duration in seconds
|
|
|
|
Returns:
|
|
int: result
|
|
"""
|
|
for i in range(duration):
|
|
self.update_state(
|
|
state="PROGRESS",
|
|
meta={"current": i, "total_duration": duration, "status": "running"},
|
|
)
|
|
time.sleep(1)
|
|
|
|
return {
|
|
"current": 100,
|
|
"total": 100,
|
|
"status": "Task completed!",
|
|
"total_duration": duration,
|
|
"result": duration,
|
|
}
|