Давайте рассмотрим несколько сценариев.
1. Выполнить задачу через N времени.
В файле main.py изменим вызов задачи:
call_background_task.apply_async(args=[message], countdown=60*5)
Самый простой - аргумент countdown - в переводе "обратный отсчёт". Он позволяет задать время в секундах, через которое задача станет доступна для выполнения.
Запустим сервер и вызовем конечную точку hello_world. Первым делом в консоли Celery мы увидим что наша задача была зарегистрирована:
[2024-05-10 09:33:55,413: INFO/MainProcess] Task task.call_background_task[aeeab66a-2cf7-409f-8512-85b351d70c10] received
И спустя 5 минут и 10 секунд мы увидим следующее:
[2024-05-10 09:39:05,377: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 09:39:05,380: WARNING/ForkPoolWorker-8] Test
[2024-05-10 09:39:05,384: INFO/ForkPoolWorker-8] Task task.call_background_task[aeeab66a-2cf7-409f-8512-85b351d70c10] succeeded in 10.007899875000021s: None
Данный способ не подойдет, если вы используете Redis в качестве брокера. Дело в том, что Redis помещает отложенные задачи в очередьunacked, из которой по истечение времени, указанного в аргументе VISIBILITY_TIMEOUT, задача будет назначена еще одному обработчику. Например, countdown у нас равен 120 минутам, а VISIBILITY_TIMEOUT по умолчанию 60. В таком случае есть риск, что задача будет назначена сразу трём обработчикам (первому сразу, второму через 60 минут, третьему - если задача через 120 минут будет еще в очереди). В результате, мы получим выполнение одной и той же задачи несколько раз. Подробнее в документации тут и тут.
Если вы используете Redis в качестве брокер, то параметрconsumer_timeout по умолчанию равен 30 минутам. Не желательно устанавливать countdown больше этого времени, иначе будет возбуждено исключение PRECONDITION_FAILED. Если есть такая необходимость, необходимо увеличить время в rabbitmq.conf. Подробнее - тут.
2) Выполнить задачу в определенное время
Эта ситуация очень похожа на предыдущую и мы могли бы использовать countdown. Но он лучше подходит для небольших промежутков времени - через минуту или пол часа. А для назначения задачи на конкретное время намного удобнее использовать аргумент eta. Он расшифровывается как Estimated Time of Arrival, что в переводе "Ожидаемое время прибытия".
Здесь есть две важных детали:
-
при использовании Redis отложенные с помощью
etaзадачи столкнутся с той же проблемой, что иcountdownиз-заVISIBILITY_TIMEOUT. -
eta- это не точное время, в которое будет выполнена задача. Указывая время, мы говорим Celery - "задача должна быть выполнена не раньше этого времени". Как только это время наступит - задача будет выполнена в порядке очереди и будет зависеть от количества задач в очереди.
@app.get("/")
async def hello_world(message: str):
task_datetime = datetime.now(timezone.utc) + timedelta(minutes=10)
call_background_task.apply_async(args=[message], eta=task_datetime)
return {'message': 'Hello World!'}
В данном случае мы говорим установить время выполнения задачи, время сейчас + 10 минут. Основное отличие от метода countdown в том, что в параметре eta мы должны указать дату и время.
Запустим сервер и проверим работу. Выполним запрос и увидим что наша задача была добавлена.
[2024-05-10 10:33:38,013: INFO/MainProcess] Task task.call_background_task[019bc540-f5d5-42d3-8436-16f49bb7ce1e] received
Спустя 10 минут мы увидим следующее:
[2024-05-10 10:43:47,921: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 10:43:47,923: WARNING/ForkPoolWorker-8] Test
[2024-05-10 10:43:47,928: INFO/ForkPoolWorker-8] Task task.call_background_task[019bc540-f5d5-42d3-8436-16f49bb7ce1e] succeeded in 10.009074000000055s: None
Мы видим что наша задача была выполнена в установленное время (спустя 10 минут).
3. Отмена выполнения задачи по истечение времени
Возможна ситуация, когда определенная задача теряет свою актуальность, если не выполнена в течение какого то времени. Рассмотрим на примере генерации отчёта. Пользователь отправил запрос на генерацию отчёта. Задача попала в очередь и за час ни один обработчик не смог её обработать. В таком случае нам можно отменить "просроченную" задачу. Для этого применяется аргументexpires. Он принимает либо число в секундах, либо объект datetime.
call_background_task.apply_async(args=[message], expires=3600)
В данном шаге мы рассмотрели основные возможности вызова задач. В следующем шаге мы познакомимся с периодическими заданиями в Celery.