6.5 Асинхронные задачи с Celery и Redis
7 из 7 шагов пройдено

Чтобы было проще отслеживать, запустим брокер Celery со следующими параметрами:

celery -A main.celery worker --loglevel=info

 

Теперь мы видим нашу задачу:

[tasks]
  . main.call_background_task

Запустим сервер, перейдем в документацию, и попробуем отправить GET запрос:

И сразу же мы увидим в консоли Celery cледующий лог:

[2024-05-09 11:28:34,970: INFO/MainProcess] Task main.call_background_task[7cb1d3e2-d982-43d0-8351-6331f3c877a8] received

И через 10 секунд мы увидим следующее:

[2024-05-09 11:28:44,975: WARNING/ForkPoolWorker-8] Background Task called!
[2024-05-09 11:28:44,977: WARNING/ForkPoolWorker-8] Test Celery message
[2024-05-09 11:28:44,989: INFO/ForkPoolWorker-8] Task main.call_background_task[7cb1d3e2-d982-43d0-8351-6331f3c877a8] succeeded in 10.016356958998585s: None

То есть наша задача успешно отработала.

На данный момент, наше приложение FastAPI и задачи Celery хранятся в одном файле. Давайте рассмотрим следующий вариант, когда у нас будет основной файл main.py, в котором находится приложение FastAPI и второй файл task.py, в котором будут храниться наши задачи.

Изменим код файла main.py:

from fastapi import FastAPI
from celery import Celery
from task import call_background_task

app = FastAPI()

celery = Celery(
    __name__,
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0',
    broker_connection_retry_on_startup=True
)


@app.get("/")
async def hello_world(message: str):
    call_background_task.delay(message)
    return {'message': 'Hello World!'}

Создадим файл task.py следующего содержания:

import time
from celery import shared_task

@shared_task()
def call_background_task(message):
    time.sleep(10)
    print(f"Background Task called!")
    print(message)

Вместо того, чтобы использовать @celery.task, мы используем декоратор @shared_task. Если мы этого не сделаем, это приведет к проблеме круговой зависимости. main.py потребуется tasks.py для определения функции call_background_task, а task.py потребуется экземпляр Celery, импортированный из main.py

Теперь мы можем запустить сервер uvicorn и celery, на этот раз логика работы не изменится.


Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Нет обсуждений. Начните первое.