Продвинутый Django 5 для продолжающих

Прогресс по курсу:  0/193

9.3 Асинхронная отправка электронных писем
2 из 2 шагов пройдено

Задачи (Tasks)

Задачи иногда также называют сообщениями. По сути брокер сообщений - это нечто, что передает сообщения из одной системы в другую. В нашем случае сообщение представляет собой описание задачи: название (уникальный идентификатор), входные параметры, время ожидания, количество повторных попыток и тд. 

В celery задача является классом. Таким образом, каждый раз, когда вы используете декоратор для функции (например, @shared_task), чтобы сделать ее celery задачей, под капотом создается класс. Это означает, что у каждой задачи есть self, к которому добавляется множество атрибутов, например: namerequeststatuspriorityretries и многое другое. Если мы хотим получить доступ к этим атрибутам, то нужно указать параметр bind=True.

Прежде чем что-либо может быть запущено в Celery, оно должно быть декларировано как задача. 

Вот как это сделать, добавим в main/tasks.py следующий код:

from django.urls import reverse
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
from publish.celery import app


@app.task
def send_verification_email(user_id):
    UserModel = get_user_model()
    try:
        user = UserModel.objects.get(pk=user_id)
        send_mail(
            'Verify your  account',
            'Follow this link to verify your account: '
            'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(user.verification_uuid)}),
            'admin@localhost.ru',
            [user.email],
            fail_silently=False,
        )
    except UserModel.DoesNotExist:
        print("Tried to send verification email to non-existing user '%s'" % user_id)


Мы здесь сделали следующее: мы переместили функцию отправки по почты в другой файл под названием tasks.py

Несколько примечаний:

  • Имя файла имеет значение. Celery проходит через все приложения в INSTALLED_APPS и регистрирует задачи в файлах tasks.py.
  • Обратите внимание, как мы декорировали send_verification_email с помощью @app.task. Это указывает Celery, что это задача, которая будет выполняться в очереди задач.
  • Обратите внимание, что мы ожидаем аргумент user_id, а не объект User. Это связано с тем, что при отправке задач на Celery может возникнуть проблема с сериализацией сложных объектов. Лучше использовать примитивные типы.


Возвращаясь к main/models.py, код сигнала преобразуется в:

from django.db.models import signals
from main.tasks import send_verification_email


def user_post_save(sender, instance, signal, *args, **kwargs):
    if not instance.is_verified:
        # Send verification email
        send_verification_email.delay(instance.pk)


signals.post_save.connect(user_post_save, sender=User)


Обратите внимание, как мы вызываем метод .delay объекта задачи. Это означает, что мы отправляем задание на Celery, и мы не ожидаем результата. Если бы мы использовали send_verification_email (instance.pk), мы все равно отправили бы его в Celery и ждали завершения задачи, чего мы не хотим.


Прежде чем Вы начнете создавать нового пользователя, есть есть загвоздка. Celery - это сервис, и нам нужно его запустить. Откройте новую консоль и введите. На данном этапе мы используем 2 терминала, первый это запущенный Redis, второй это запущенный Celery.

celery -A publish worker --loglevel=info

Эта команда запустит процесс воркеров Celery.


Для пользователей Windows сначала необходимо установить библиотеку eventlet:

pip install eventlet

А для запуска необходимо использовать такую команду:

celery -A publish worker -P eventlet --loglevel=info


Мы также видим что в списке задач у нас появилась задача main.tasks.send_verification_email:


Осталось запустить сервер еще в одном терминале и теперь Вы можете, наконец, перейти к созданию пользователя.


После создания, обратите внимание на то, что нет задержки при создании, и также мы можем следить за логами в консоли Celery и посмотреть, правильно ли выполняются задачи. Это должно выглядеть примерно так:


Осталось проверить почту:


Как мы видим, отправка писем через Celery прекрасно работает и мы избавились от задержки отправки письма при создании пользователя.


  • Комментария
Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Оставить комментарий

У меня выводит только [2023-09-10 12:16:06,720: INFO/MainProcess] Task main.tasks.send_verification_email[f0dbf428-eeed-44b5-b2b7-3b8b910c9e40] received, и далее чайлд процессы, а succeed не наблюдается, соответственно email не приходит(без использования celery приходил)
я на windows, в чем может быть проблема?

@Павел_Лепешинский, починил, но с использованием eventlet, вот так celery -A publish worker -P eventlet -c 1000 --loglevel=info

так предпочтительней, получается, запускать?

@Павел_Лепешинский, Похоже что это особенности работы celery под Windows. И придется запускать через библиотеку eventlet.

Спасибо, буду изучать)

Получается не имеет значение в какое приложение создать celery.py с его настройками?

@Ilia_Boiarintsev, да, одна из лучших практик на большом проекте будет выглядеть вот так:

project/
  project/
      settings.py
      celery.py <- Настройки Celery
      urls.py
  account/
      models.py
      tasks.py <-- Таски
      views.py 
  blog/
      models.py
      tasks.py <-- Таски
      views.py
  media/
      models.py
      tasks.py <-- Таски
      views.py
  manage.py

[tasks]
  . main.tasks.send_verification_email

[2023-11-07 17:24:27,192: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 10061] WSAECONNREFUSED.
Trying again in 2.00 seconds... (1/100)

Подскажите, в чем может быть проблема? У меня Windows 10. В прошлом уроке Redis настроил через окно терминала Ubuntu. Здесь запустил Redis также через Ubuntu, запустил celery через PyCharm и выдает такое сообщение. При запуске сервера и при вводе пароля admin выходит ошибка 

OperationalError at /admin/login/

[WinError 10061] Подключение не установлено, т.к. конечный компьютер отверг запрос на подключение
А команда redis-cli что отображает? И почему порт не 6379?
Изменен Илья Перминов

@Илья_Перминов, astgregory@DESKTOP-ED97ANC:/mnt/g/PyProjects/Study/django_celery$ redis-cli
127.0.0.1:6379> ping
PONG
127.0.0.1:6379>
astgregory@DESKTOP-ED97ANC:/mnt/g/PyProjects/Study/django_celery$ redis-cli lrange celery 0 100
(empty array)

Почему такой порт не знаю, копировал всё, что написано было.

@Григорий_Кожанов, Проверьте порт в этом файле. Redis сидит на 6379 порту, и в вашем случае он успешно отвечает.

Давайте добавим связанные с Celery/Redis конфиги в django_celery/settings.py:

REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600}
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'

@Илья_Перминов, На Windows 10 так и не получилось настроить. Перешел на Linux и всё заработало. Вот только команда redis-server не срабатывает в терминале. пишет:

Could not create server TCP listening socket *:6379: bind: Address already in use

@Григорий_Кожанов, Может это поможет?

@Илья_Перминов, Да, спасибо, помогло :)