6.5 Асинхронные задачи с Celery и Redis
7 из 7 шагов пройдено

Чтобы создавать периодические задачи, нам нужно определить их с помощью параметра beat_schedule. Celery Beat проверяет параметр beat_schedule для управления задачами,  

from fastapi import FastAPI
from celery import Celery
from task import call_background_task

app = FastAPI()

celery = Celery(
    __name__,
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0',
    broker_connection_retry_on_startup=True
)



@app.get("/")
async def hello_world(message: str):
    call_background_task.apply_async(args=[message], expires=3600)
    return {'message': 'Hello World!'}


celery.conf.beat_schedule = {
    'run-me-background-task': {
        'task': 'task.call_background_task',
        'schedule': 60.0,
        'args': ('Test text message',)
    }
}

Мы говорим beat_schedule запускать функцию call_background_task() каждые 60 секунд. Функция для запуска идентифицируется <имя модуля>.<имя функции>. 

Если мы запустим это с помощью следующей команды:

celery -A main.celery beat --loglevel=info

В консоли мы увидим несколько строк вывода, которые каждые 60 секунд будут запускать наше задание.

[2024-05-10 11:01:29,028: INFO/MainProcess] Scheduler: Sending due task run-me-background-task (task.call_background_task)
[2024-05-10 11:02:29,027: INFO/MainProcess] Scheduler: Sending due task run-me-background-task (task.call_background_task)
[2024-05-10 11:03:29,027: INFO/MainProcess] Scheduler: Sending due task run-me-background-task (task.call_background_task)

task.call_background_task запускается каждые 60 секунд, но он на самом деле выполняется? Как было сказано ранее, Celery использует настроенный брокер сообщений для отправки и получения сообщений, в нашем примере Redis является брокером сообщений. Celery Beat отправляет задачу task.call_background_task в очередь Redis каждые 60 секунд.

Обработчики получают сообщения, извлекая их из очереди. В настоящий момент мы видим, что Celery Beat периодически добавляет задачи в очередь. Мы хотим, чтобы обработчики выполнили задачи.

В другом терминале запускаем нашего обработчика (worker):

celery -A main.celery worker --loglevel=info

И теперь уже в консоли мы увидим выполнение заданий:

[2024-05-10 11:01:29,031: INFO/MainProcess] Task task.call_background_task[f0c6f5be-49b9-45e3-968c-ab1bff1db51e] received
[2024-05-10 11:01:39,035: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 11:01:39,038: WARNING/ForkPoolWorker-8] Test text message
[2024-05-10 11:01:39,042: INFO/ForkPoolWorker-8] Task task.call_background_task[f0c6f5be-49b9-45e3-968c-ab1bff1db51e] succeeded in 10.00885904200004s: None
[2024-05-10 11:02:29,034: INFO/MainProcess] Task task.call_background_task[dd2d054b-d6b1-4649-bd17-09bca7f5f2e9] received
[2024-05-10 11:02:39,039: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 11:02:39,043: WARNING/ForkPoolWorker-8] Test text message
[2024-05-10 11:02:39,046: INFO/ForkPoolWorker-8] Task task.call_background_task[dd2d054b-d6b1-4649-bd17-09bca7f5f2e9] succeeded in 10.009045499999957s: None
[2024-05-10 11:03:29,034: INFO/MainProcess] Task task.call_background_task[677399cd-3d46-4fd5-b096-ab6aab1121ef] received
[2024-05-10 11:03:36,352: INFO/MainProcess] Task task.call_background_task[64f8eddf-b82c-48c8-be99-45dae3e77a2f] received
[2024-05-10 11:03:39,039: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 11:03:39,041: WARNING/ForkPoolWorker-8] Test text message
[2024-05-10 11:03:39,057: INFO/ForkPoolWorker-8] Task task.call_background_task[677399cd-3d46-4fd5-b096-ab6aab1121ef] succeeded in 10.019484375000047s: None

Как видим, обработчик получил задачу и вскоре она выполнена. Это простейший пример, но в реальном приложении время выполнения задачи может быть совсем иным.

Другой способ работы с запланированной задачей - использовать Crontab Schedules из Celery Вeat. Если нам нужно больше контроля над задачей для выполнения, мы можем использовать:

celery.conf.beat_schedule = {
    'run-me-background-task': {
        'task': 'task.call_background_task',
        'schedule': crontab(hour=7, minute=0),
        'args': ('Test text message',)
    }
}

Здесь мы указываем Celery запускать task.call_background_task каждый день в 7:00.

Вы также можете указать различные расписания Celery Crontab:

Пример

Значение

crontab()

Выполняется каждую минуту.

crontab(minute=0, hour=0)

Выполняется ежедневно в полночь.

crontab(minute=0, hour='*/3')

Выполняется каждые три часа: полночь, 3 утра, 6 утра, 9 утра, полдень, 15:00, 6 вечера, 9 вечера.

crontab(minute=0,

hour='0,3,6,9,12,15,18,21')

То же, что и в прошлом.

crontab(minute='*/15')

Выполняется каждые 15 минут.

crontab(day_of_week='sunday')

Выполняется каждую минуту (!) по воскресеньям.

crontab(minute='*',

hour='*',day_of_week='sun')

То же, что и в прошлом.

crontab(minute='*/10',

hour='3,17,22',day_of_week='thu,fri')

Выполняется каждые десять минут, но только между 3–4 часами утра, 5–6 вечера и 22–23 часами по четвергам и пятницам.

crontab(minute=0,hour='*/2,*/3')

Выполняется в каждый чётный час и в каждый час делимый на три. Это означает: каждый час, кроме: 1:00, 5:00, 7:00, 11:00, 13:00, 17:00, 19:00, 23:00

crontab(minute=0, hour='*/5')

Выполняется в каждый час, делимый на 5.

crontab(minute=0,hour='*/3,8-17')

Выполняется в каждый час, делимый на 3, и каждый час в рабочее время (с 8:00 до 17:00).

crontab(0, 0,day_of_month='2')

Выполняется во второй день каждого месяца.

crontab(0, 0,

day_of_month='2-30/2')

Выполняется в каждый чётный день месяца.

crontab(0, 0,

day_of_month='1-7,15-21')

Выполняется в первую и третью недели месяца.

crontab(0, 0,day_of_month='11',

month_of_year='5')

Выполняется одиннадцатого мая каждого года.

crontab(0, 0,

month_of_year='*/3')

Выполняется каждый день в первый месяц каждого квартала.

Более подробнее вы можете прочитать в документации.

Задачи часто используются для выполнения ненадежных операций, операций, зависящих от внешних ресурсов, или задачи, которые могут легко падать по различным причинам. Вот небольшие советы, чтобы сделать их более надежными:

  • Сделать задачи идемпотентными. Идемпотентная задача - это задача, которая, не изменяет состояние системы, если она остановлена на полпути. Задача либо полностью вносит изменения в систему, либо нет.
  • Повторение задачи. Если задача падает, рекомендуется попробовать выполнять ее снова и снова, пока она не будет выполнена успешно. Вы можете сделать это с помощью Celery Retry.

Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.

https://crontab.guru/ удобный инструмент для crontab )