Прежде чем начать изучение Celery, давайте начнем с самого простого инструмента, который поможет нам понять фоновые задачи. У FastAPI уже есть класс BackgroundTasks, который может помочь нам реализовать простые фоновые задачи.
Давайте создадим новый проект и начнем реализацию.
import time
from fastapi import FastAPI
app = FastAPI()
def call_background_task():
time.sleep(10)
print(f"Background Task called!")
@app.get("/")
async def hello_world():
call_background_task()
return {'message': 'Hello World!'}
В приведенном выше примере при переходе в конечную точку hello_world мы вызываем функцию call_background_task, которая ждет 10 секунд и выводит в консоль текст "Background Task called!"
Запустим сервер и проверим работу перейдя http://127.0.0.1:8000 мы увидим что загрузка страницы заняла около 10 секунд. Это из-за 10-секундной задержки в функции call_background_task, которую мы добавили. Это не идеальное поведение. Мы излишне заставляем пользователя ждать выполнение логики.
Есть 2 основные проблемы, которые мы можем наблюдать:
- Запрос блокируется, т.е. требуется 10 секунд.
- Если мы отправим более 1 запроса, они не будут обрабатываться параллельно. Мы можем это проверить открыв несколько вкладок.
Чтобы улучшить работу, мы можем убрать логику выполнения в фоновом режиме. Для этого мы можем использовать класс BackgroundTasks.
Это самый простой и рекомендуемый способ выполнения фоновых задач в FastAPI. Вам просто нужно импортировать BackgroundTasks из fastapi и объявить параметр типа BackgroundTasks в функции операции пути. Затем вы можете использовать метод add_task() для регистрации функции задачи, которая будет выполняться в фоновом режиме после возврата ответа.
import time
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def call_background_task():
time.sleep(10)
print(f"Background Task called!")
@app.get("/")
async def hello_world(background_tasks: BackgroundTasks):
background_tasks.add_task(call_background_task)
return {'message': 'Hello World!'}
Запустим сервер и проверим работу перейдя http://127.0.0.1:8000 мы увидим что загрузка страницы уже не заняла 10 секунд. Мы получаем ответ, как только попадаем в конечную точку URL. И уже после будут выполнены наши фоновые задания.
Если фоновому процессу требуются аргументы, мы передаем эти аргументы в функцию add_task() сразу после ее первого параметра.
import time
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def call_background_task(message):
time.sleep(10)
print(f"Background Task called!")
print(message)
@app.get("/")
async def hello_world(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(call_background_task, message)
return {'message': 'Hello World!'}
Класс BackgroundTasks хорош, однако у него есть свои ограничения. На данный момент у него нет некоторых расширенных функций, таких как повторение задачи, оркестровка задач, отслеживание задачи или ее отмена. Другое основное ограничение заключается в том, что когда мы выполняем сверхтяжелые вычисления, мы можем не хотеть, чтобы задача выполнялась в том же процессе. Иногда мы можем захотеть распределить задачи по нескольким серверам. В таких случаях Celery является одной из лучших альтернатив. Мы начнем изучать это в следующем разделе.