6.5 Асинхронные задачи с Celery и Redis
7 из 7 шагов пройдено

Продолжим работу с кодом фоновых задач из прошлого раздела. Создадим новый проект, где в файл main.py добавим следующий код:

import time

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()


def call_background_task(message):
    time.sleep(10)
    print(f"Background Task called!")
    print(message)


@app.get("/")
async def hello_world(message: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(call_background_task, message)
    return {'message': 'Hello World!'}

Celery — это неблокирующая очередь задач, работающая в распределенной системе. Он может управлять асинхронными фоновыми процессами, которые огромны и требуют большой нагрузки на процессор. Это сторонний инструмент, поэтому сначала нам нужно установить его:

pip install Celery

Он планирует и выполняет задачи одновременно на одном сервере или в распределенной среде. Но для отправки и получения сообщений требуется транспорт сообщений, такой как Redis, база данных в памяти, которую можно использовать в качестве брокера сообщений для сообщений в строках, словарях, списках, наборах, растровых изображениях и типах потоков.

Обработчики и Брокеры (Workers & Brokers)

Для начала, нужно объяснить некоторые основные понятия, которые используются в Celery

Итак, Celery — это программа, которая отслеживает задачи (tasks), которые необходимо выполнить, и в которой есть набор обработчиков (workers), которые будут выполнять эти задачи. Основной смысл в том, что она (программа) может выполнять несколько задач параллельно и что она не блокирует поставщиков (producers) этих самых задач.

Celery на самом деле не хранит все эти задачи в памяти. Для хранения задач есть отдельный сервис, называемый брокером сообщений (message broker), который по сути своей является очередью. Обычно это либо Redis, либо RabbitMQ. Т.е. Celery следит за тем, что происходит в очереди, но хранится она внутри Redis/RabbitMQ.

Брокеры промежуточно отправляют сообщения между веб-приложением и Celery. В этом разделе мы будем использовать Redis. Redis прост в установке, и мы можем легко начать с него без особых проблем, установите его, следуя инструкциям на странице Redis Quick Start.

Также установим зависимости в наше приложение.

pip install redis

Запустите Redis сервер в отдельной консоли следующим образом: redis-server

Давайте добавим связанные с Celery/Redis конфиги в main.py:

import time

from celery import Celery
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

celery = Celery(
    __name__,
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0',
    broker_connection_retry_on_startup=True
)


def call_background_task(message):
    time.sleep(10)
    print(f"Background Task called!")
    print(message)


@app.get("/")
async def hello_world(message: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(call_background_task, message)
    return {'message': 'Hello World!'}

При запуске Celery создается 1 обработчик. Запустим в еще одной отдельной консоли его:

celery -A main.celery worker

Этот обработчик является главным процессом (supervisor process), который будет порождать дочерние процессы или потоки, которые в свою очередь будут выполнять задачи.

По умолчанию главный обработчик будет создавать дочерние процессы, а не потоки, и он создаст столько одновременных дочерних процессов, сколько ядер у процессора.

Главный процесс будет следить за тем, что происходит с задачами и процессами/потоками, но он не будет запускать сами задачи. Эта группа дочерних процессов или потоков, которая ожидает выполнения задач, называется пулом выполнения (execution pool) или пулом потоков (thread pool).

Продолжим в следующем шаге.


Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Нет обсуждений. Начните первое.