Продолжим работу с кодом фоновых задач из прошлого раздела. Создадим новый проект, где в файл main.py добавим следующий код:
import time
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
def call_background_task(message):
time.sleep(10)
print(f"Background Task called!")
print(message)
@app.get("/")
async def hello_world(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(call_background_task, message)
return {'message': 'Hello World!'}
Celery — это неблокирующая очередь задач, работающая в распределенной системе. Он может управлять асинхронными фоновыми процессами, которые огромны и требуют большой нагрузки на процессор. Это сторонний инструмент, поэтому сначала нам нужно установить его:
pip install Celery
Он планирует и выполняет задачи одновременно на одном сервере или в распределенной среде. Но для отправки и получения сообщений требуется транспорт сообщений, такой как Redis, база данных в памяти, которую можно использовать в качестве брокера сообщений для сообщений в строках, словарях, списках, наборах, растровых изображениях и типах потоков.
Обработчики и Брокеры (Workers & Brokers)
Для начала, нужно объяснить некоторые основные понятия, которые используются в Celery.
Итак, Celery — это программа, которая отслеживает задачи (tasks), которые необходимо выполнить, и в которой есть набор обработчиков (workers), которые будут выполнять эти задачи. Основной смысл в том, что она (программа) может выполнять несколько задач параллельно и что она не блокирует поставщиков (producers) этих самых задач.
Celery на самом деле не хранит все эти задачи в памяти. Для хранения задач есть отдельный сервис, называемый брокером сообщений (message broker), который по сути своей является очередью. Обычно это либо Redis, либо RabbitMQ. Т.е. Celery следит за тем, что происходит в очереди, но хранится она внутри Redis/RabbitMQ.
Брокеры промежуточно отправляют сообщения между веб-приложением и Celery. В этом разделе мы будем использовать Redis. Redis прост в установке, и мы можем легко начать с него без особых проблем, установите его, следуя инструкциям на странице Redis Quick Start.
Также установим зависимости в наше приложение.
pip install redis
Запустите Redis сервер в отдельной консоли следующим образом: redis-server
Давайте добавим связанные с Celery/Redis конфиги в main.py:
import time
from celery import Celery
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()
celery = Celery(
__name__,
broker='redis://127.0.0.1:6379/0',
backend='redis://127.0.0.1:6379/0',
broker_connection_retry_on_startup=True
)
def call_background_task(message):
time.sleep(10)
print(f"Background Task called!")
print(message)
@app.get("/")
async def hello_world(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(call_background_task, message)
return {'message': 'Hello World!'}
При запуске Celery создается 1 обработчик. Запустим в еще одной отдельной консоли его:
celery -A main.celery worker
Этот обработчик является главным процессом (supervisor process), который будет порождать дочерние процессы или потоки, которые в свою очередь будут выполнять задачи.
По умолчанию главный обработчик будет создавать дочерние процессы, а не потоки, и он создаст столько одновременных дочерних процессов, сколько ядер у процессора.
Главный процесс будет следить за тем, что происходит с задачами и процессами/потоками, но он не будет запускать сами задачи. Эта группа дочерних процессов или потоков, которая ожидает выполнения задач, называется пулом выполнения (execution pool) или пулом потоков (thread pool).
Продолжим в следующем шаге.