6.5 Асинхронные задачи с Celery и Redis
7 из 7 шагов пройдено

Давайте рассмотрим несколько сценариев.

 

1. Выполнить задачу через N времени.

В файле main.py изменим вызов задачи:

call_background_task.apply_async(args=[message], countdown=60*5)

Самый простой - аргумент countdown - в переводе "обратный отсчёт". Он позволяет задать время в секундах, через которое задача станет доступна для выполнения.

Запустим сервер и вызовем конечную точку hello_world. Первым делом в консоли Celery мы увидим что наша задача была зарегистрирована:

[2024-05-10 09:33:55,413: INFO/MainProcess] Task task.call_background_task[aeeab66a-2cf7-409f-8512-85b351d70c10] received

И спустя 5 минут и 10 секунд мы увидим следующее:

[2024-05-10 09:39:05,377: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 09:39:05,380: WARNING/ForkPoolWorker-8] Test
[2024-05-10 09:39:05,384: INFO/ForkPoolWorker-8] Task task.call_background_task[aeeab66a-2cf7-409f-8512-85b351d70c10] succeeded in 10.007899875000021s: None

Данный способ не подойдет, если вы используете Redis в качестве брокера. Дело в том, что Redis помещает отложенные задачи в очередьunacked, из которой по истечение времени, указанного в аргументе VISIBILITY_TIMEOUT, задача будет назначена еще одному обработчику. Например, countdown у нас равен 120 минутам, а VISIBILITY_TIMEOUT по умолчанию 60. В таком случае есть риск, что задача будет назначена сразу трём обработчикам (первому сразу, второму через 60 минут, третьему - если задача через 120 минут будет еще в очереди). В результате, мы получим выполнение одной и той же задачи несколько раз. Подробнее в документации тут и тут.

Если вы используете Redis в качестве брокер, то параметрconsumer_timeout по умолчанию равен 30 минутам. Не желательно устанавливать countdown больше этого времени, иначе будет возбуждено исключение PRECONDITION_FAILED. Если есть такая необходимость, необходимо увеличить время в rabbitmq.conf. Подробнее - тут.

 

2) Выполнить задачу в определенное время

Эта ситуация очень похожа на предыдущую и мы могли бы использовать countdown. Но он лучше подходит для небольших промежутков времени - через минуту или пол часа. А для назначения задачи на конкретное время намного удобнее использовать аргумент eta. Он расшифровывается как Estimated Time of Arrival, что в переводе "Ожидаемое время прибытия".

Здесь есть две важных детали:

  • при использовании Redis отложенные с помощью eta задачи столкнутся с той же проблемой, что и countdown из-за VISIBILITY_TIMEOUT.

  • eta - это не точное время, в которое будет выполнена задача. Указывая время, мы говорим Celery - "задача должна быть выполнена не раньше этого времени". Как только это время наступит - задача будет выполнена в порядке очереди и будет зависеть от количества задач в очереди.

@app.get("/")
async def hello_world(message: str):
    task_datetime = datetime.now(timezone.utc) + timedelta(minutes=10)
    call_background_task.apply_async(args=[message], eta=task_datetime)
    return {'message': 'Hello World!'}

В данном случае мы говорим установить время выполнения задачи, время сейчас + 10 минут. Основное отличие от метода countdown в том, что в параметре eta мы должны указать дату и время.

Запустим сервер и проверим работу. Выполним запрос и увидим что наша задача была добавлена.

[2024-05-10 10:33:38,013: INFO/MainProcess] Task task.call_background_task[019bc540-f5d5-42d3-8436-16f49bb7ce1e] received

 Спустя 10 минут мы увидим следующее:

[2024-05-10 10:43:47,921: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 10:43:47,923: WARNING/ForkPoolWorker-8] Test
[2024-05-10 10:43:47,928: INFO/ForkPoolWorker-8] Task task.call_background_task[019bc540-f5d5-42d3-8436-16f49bb7ce1e] succeeded in 10.009074000000055s: None

Мы видим что наша задача была выполнена в установленное время (спустя 10 минут).

 

3. Отмена выполнения задачи по истечение времени

Возможна ситуация, когда определенная задача теряет свою актуальность, если не выполнена в течение какого то времени. Рассмотрим на примере генерации отчёта. Пользователь отправил запрос на генерацию отчёта. Задача попала в очередь и за час ни один обработчик не смог её обработать. В таком случае нам можно отменить "просроченную" задачу. Для этого применяется аргументexpires. Он принимает либо число в секундах, либо объект datetime.

call_background_task.apply_async(args=[message], expires=3600)

 

 

В данном шаге мы рассмотрели основные возможности вызова задач. В следующем шаге мы познакомимся с периодическими заданиями в Celery.


Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Нет обсуждений. Начните первое.