Чтобы создавать периодические задачи, нам нужно определить их с помощью параметра beat_schedule. Celery Beat проверяет параметр beat_schedule для управления задачами,
from fastapi import FastAPI
from celery import Celery
from task import call_background_task
app = FastAPI()
celery = Celery(
__name__,
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0',
broker_connection_retry_on_startup=True
)
@app.get("/")
async def hello_world(message: str):
call_background_task.apply_async(args=[message], expires=3600)
return {'message': 'Hello World!'}
celery.conf.beat_schedule = {
'run-me-background-task': {
'task': 'task.call_background_task',
'schedule': 60.0,
'args': ('Test text message',)
}
}
Мы говорим beat_schedule запускать функцию call_background_task() каждые 60 секунд. Функция для запуска идентифицируется <имя модуля>.<имя функции>.
Если мы запустим это с помощью следующей команды:
celery -A main.celery beat --loglevel=info
В консоли мы увидим несколько строк вывода, которые каждые 60 секунд будут запускать наше задание.
[2024-05-10 11:01:29,028: INFO/MainProcess] Scheduler: Sending due task run-me-background-task (task.call_background_task)
[2024-05-10 11:02:29,027: INFO/MainProcess] Scheduler: Sending due task run-me-background-task (task.call_background_task)
[2024-05-10 11:03:29,027: INFO/MainProcess] Scheduler: Sending due task run-me-background-task (task.call_background_task)
task.call_background_task запускается каждые 60 секунд, но он на самом деле выполняется? Как было сказано ранее, Celery использует настроенный брокер сообщений для отправки и получения сообщений, в нашем примере Redis является брокером сообщений. Celery Beat отправляет задачу task.call_background_task в очередь Redis каждые 60 секунд.
Обработчики получают сообщения, извлекая их из очереди. В настоящий момент мы видим, что Celery Beat периодически добавляет задачи в очередь. Мы хотим, чтобы обработчики выполнили задачи.
В другом терминале запускаем нашего обработчика (worker):
celery -A main.celery worker --loglevel=info
И теперь уже в консоли мы увидим выполнение заданий:
[2024-05-10 11:01:29,031: INFO/MainProcess] Task task.call_background_task[f0c6f5be-49b9-45e3-968c-ab1bff1db51e] received
[2024-05-10 11:01:39,035: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 11:01:39,038: WARNING/ForkPoolWorker-8] Test text message
[2024-05-10 11:01:39,042: INFO/ForkPoolWorker-8] Task task.call_background_task[f0c6f5be-49b9-45e3-968c-ab1bff1db51e] succeeded in 10.00885904200004s: None
[2024-05-10 11:02:29,034: INFO/MainProcess] Task task.call_background_task[dd2d054b-d6b1-4649-bd17-09bca7f5f2e9] received
[2024-05-10 11:02:39,039: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 11:02:39,043: WARNING/ForkPoolWorker-8] Test text message
[2024-05-10 11:02:39,046: INFO/ForkPoolWorker-8] Task task.call_background_task[dd2d054b-d6b1-4649-bd17-09bca7f5f2e9] succeeded in 10.009045499999957s: None
[2024-05-10 11:03:29,034: INFO/MainProcess] Task task.call_background_task[677399cd-3d46-4fd5-b096-ab6aab1121ef] received
[2024-05-10 11:03:36,352: INFO/MainProcess] Task task.call_background_task[64f8eddf-b82c-48c8-be99-45dae3e77a2f] received
[2024-05-10 11:03:39,039: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-10 11:03:39,041: WARNING/ForkPoolWorker-8] Test text message
[2024-05-10 11:03:39,057: INFO/ForkPoolWorker-8] Task task.call_background_task[677399cd-3d46-4fd5-b096-ab6aab1121ef] succeeded in 10.019484375000047s: None
Как видим, обработчик получил задачу и вскоре она выполнена. Это простейший пример, но в реальном приложении время выполнения задачи может быть совсем иным.
Другой способ работы с запланированной задачей - использовать Crontab Schedules из Celery Вeat. Если нам нужно больше контроля над задачей для выполнения, мы можем использовать:
celery.conf.beat_schedule = {
'run-me-background-task': {
'task': 'task.call_background_task',
'schedule': crontab(hour=7, minute=0),
'args': ('Test text message',)
}
}
Здесь мы указываем Celery запускать task.call_background_task каждый день в 7:00.
Вы также можете указать различные расписания Celery Crontab:
|
Пример |
Значение |
|
|
Выполняется каждую минуту. |
|
|
Выполняется ежедневно в полночь. |
|
|
Выполняется каждые три часа: полночь, 3 утра, 6 утра, 9 утра, полдень, 15:00, 6 вечера, 9 вечера. |
|
|
То же, что и в прошлом. |
|
|
Выполняется каждые 15 минут. |
|
|
Выполняется каждую минуту (!) по воскресеньям. |
|
|
То же, что и в прошлом. |
|
|
Выполняется каждые десять минут, но только между 3–4 часами утра, 5–6 вечера и 22–23 часами по четвергам и пятницам. |
|
|
Выполняется в каждый чётный час и в каждый час делимый на три. Это означает: каждый час, кроме: 1:00, 5:00, 7:00, 11:00, 13:00, 17:00, 19:00, 23:00 |
|
|
Выполняется в каждый час, делимый на 5. |
|
|
Выполняется в каждый час, делимый на 3, и каждый час в рабочее время (с 8:00 до 17:00). |
|
|
Выполняется во второй день каждого месяца. |
|
|
Выполняется в каждый чётный день месяца. |
|
|
Выполняется в первую и третью недели месяца. |
|
|
Выполняется одиннадцатого мая каждого года. |
|
|
Выполняется каждый день в первый месяц каждого квартала. |
Более подробнее вы можете прочитать в документации.
Задачи часто используются для выполнения ненадежных операций, операций, зависящих от внешних ресурсов, или задачи, которые могут легко падать по различным причинам. Вот небольшие советы, чтобы сделать их более надежными:
- Сделать задачи идемпотентными. Идемпотентная задача - это задача, которая, не изменяет состояние системы, если она остановлена на полпути. Задача либо полностью вносит изменения в систему, либо нет.
- Повторение задачи. Если задача падает, рекомендуется попробовать выполнять ее снова и снова, пока она не будет выполнена успешно. Вы можете сделать это с помощью Celery Retry.
https://crontab.guru/ удобный инструмент для crontab )