6.5 Асинхронные задачи с Celery и Redis
7 из 7 шагов пройдено

 Очереди (Queues)

Да, тут намеренно используется множественное число для очередей, потому что существует несколько видов очередей.

Прежде всего, существует главная очередь (main queue), которая принимает задачи от поставщиков (producers) по мере их поступления и передает обработчикам по мере их запроса. По умолчанию есть только одна такая очередь. Все обработчики принимают задачи из одной очереди. Но вы также можете указать несколько таких очередей и назначить конкретные обработчики на определенные очереди. Очередь по умолчанию называется celery.


Чтобы просмотреть первые 100 задач в очереди в Redis, выполните:

redis-cli lrange celery 0 100

Эти очереди сильно напоминают принцип FIFO (First In First Out), но это не совсем так. Задачи, которые сначала помещаются в очередь, первыми удаляются из очереди, но они не обязательно выполняются первыми.

 

Задачи (Tasks)

Задачи иногда также называют сообщениями. По сути брокер сообщений - это нечто, что передает сообщения из одной системы в другую. В нашем случае сообщение представляет собой описание задачи: название (уникальный идентификатор), входные параметры, время ожидания, количество повторных попыток и тд. 

Прежде чем что-либо может быть запущено в Celery, оно должно быть декларировано как задача. 

import time

from celery import Celery
from fastapi import FastAPI

app = FastAPI()

celery = Celery(
    __name__,
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0',
    broker_connection_retry_on_startup=True
)


@celery.task
def call_background_task(message):
    time.sleep(10)
    print(f"Background Task called!")
    print(message)


@app.get("/")
async def hello_world(message: str):
    call_background_task.delay(message)
    return {'message': 'Hello World!'}

Обратите внимание, как мы декорировали call_background_task с помощью @celery.task. Это указывает Celery, что это задача, которая будет выполняться в очереди задач.

Основная цель экземпляра Celery — аннотировать методы Python, чтобы они стали задачами. Экземпляр Celery имеет декоратор task(), который мы можем применять ко всем вызываемым процедурам, которые мы хотим определить как асинхронные задачи. Частью декоратора task() является имя задачи, необязательное уникальное имя, состоящее из пакета, имен модулей и имени метода транзакции.

У него есть и другие атрибуты, которые могут уточнить определение задачи, например список auto_retry, в котором регистрируются классы исключений, которые могут вызывать повторные попытки выполнения при их создании, и max_tries, который ограничивает количество повторных выполнений задачи.

Например:

@celery.task(name="my_first.tasks", auto_retry=[ValueError, TypeError], max_tries=5)

Данная задача имеет только пять повторных попыток выполнения, если во время выполнения она сталкивается с ValueError или TypeError.

Осталось только запустить. Для этого есть три способа это сделать: apply_asyncdelay и обычный вызов call.

  • apply_async - это метод, который предоставляет максимальную гибкость при запуске задачи и принимает большое количество аргументов.
  • call_background_task.apply_async(args=[arg1_value], kwargs={'key': 'value'})
  • delay - в отличие от apply_async имеет ограниченный список принимаемых аргументов. Такой способ запуска мы рассматриваем, когда нужно просто запустить задачу без необходимости передавать именованные аргументы и другие параметры.
    call_background_task.delay(arg1_value, arg2_value)

    Этот метод часто используется, когда задача принимает всего несколько аргументов и нам нужно просто её запустить.

  • Последний способ - это обычный вызов функции. В таком случае задача будет выполнена сразу же, а не помещена в очередь.

    call_background_task(arg1_value, arg2_value)

Мы в нашем случае используем метод delay, передавая внутри message. В следующем шаге приступим к практике.


Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Нет обсуждений. Начните первое.