5.11 Реализация аутентификации с помощью OAuth2 и JWT
7 из 7 шагов пройдено
2 из 2 баллов  получено

В отличие от предыдущей схемах OAuth2, мы будем внедрять зависимость get_current_user() в каждую службу API, чтобы обеспечить безопасность и ограничить доступ. Внедренный экземпляр OAuth2PasswordBearer вернет JWT для извлечения полезных данных с использованием декодеров JOSE с указанным алгоритмом декодирования. Если токен подделан, изменен или срок его действия истек, метод вызовет исключение.

Напишем эту функцию в файле routers/auth.py:

from jose import jwt, JWTError


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get('sub')
        user_id: int = payload.get('id')
        is_admin: str = payload.get('is_admin')
        is_supplier: str = payload.get('is_supplier')
        is_customer: str = payload.get('is_customer')
        expire = payload.get('exp')
        if username is None or user_id is None:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail='Could not validate user'
            )
        if expire is None:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail="No access token supplied"
            )
        if datetime.now() > datetime.fromtimestamp(expire):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Token expired!"
            )

        return {
            'username': username,
            'id': user_id,
            'is_admin': is_admin,
            'is_supplier': is_supplier,
            'is_customer': is_customer,
        }
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail='Could not validate user'
        )

В этом коде, мы декодируем полученный токен в словарь payload, используя секретный ключ и алгоритм. Далее мы извлекаем содержимое ключей словаря, и запускаем проверки. Сначала функция проверяет ключи username и user_id. Если содержимое данных ключей не найдено, то вызываем ошибку валидации пользователя. Далее функция проверяет срок действия токена. Если срок действия не указан, значит, токен не был предоставлен. Следующая проверка — валидность токена — генерируется исключение, информирующее пользователя об истечении срока действия токена. Если токен действителен, возвращается декодированная полезная нагрузка.

В блоке except для любой ошибки JWT выдается исключение неверного запроса.

Теперь, когда мы реализовали функции для создания и проверки токенов, отправляемых в приложение, давайте создадим функцию, которая проверяет аутентификацию пользователя и служит зависимостью.

get_current_user() должен быть внедрен в каждую реализацию сервиса, чтобы ограничить доступ пользователей. Но на этот раз метод не только проверит учетные данные, но и выполнит декодирование полезной нагрузки JWT.

@router.get('/read_current_user')
async def read_current_user(user: User = Depends(get_current_user)):
    return {'User': user}

Запустим сервер и проверим работу. Нажмем на кнопку «Authorize» в правом верхнем углу документации и заполним форму входа. Далее перейдем к конечной точке read_current_user и попробуем отправить запрос.

Обратите внимание, в заголовке запроса у нас присутствует сам код токена. И в теле ответа увидим все полученные данные из токена. В случае если срок действия токена закончился, мы увидим сообщение об ошибке:

Теперь, когда мы успешно создали зависимость для защиты маршрутов, давайте обновим права доступа в наших маршрутах, а также добавим функцию аутентификации для них.


Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Нет обсуждений. Начните первое.