5.11 Реализация аутентификации с помощью OAuth2 и JWT
7 из 7 шагов пройдено
2 из 2 баллов  получено

В этом разделе мы создадим систему аутентификации для нашего приложения. Мы будем использовать поток паролей OAuth2, который требует от клиента отправки имени пользователя и пароля в качестве данных формы.

Установка модуля python-multipart

Поскольку аутентификация OAuth2 невозможна без процедуры обработки формы, нам необходимо установить модуль python-multipart, прежде чем приступить к реализации. Мы можем запустить следующую команду, чтобы установить расширение:

pip install python-multipart

 

Для продолжения в файле routers/auth.py удалим лишний код, отвечающий за базовую авторизацию, и оставим лишь конечную точку создания пользователей:

from fastapi import APIRouter, Depends, status, HTTPException
from sqlalchemy import select, insert
from app.models.user import User
from app.schemas import CreateUser
from app.backend.db_depends import get_db
from typing import Annotated
from sqlalchemy.orm import Session
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix='/auth', tags=['auth'])
bcrypt_context = CryptContext(schemes=['bcrypt'], deprecated='auto')


@router.post('/')
async def create_user(db: Annotated[AsyncSession, Depends(get_db)], create_user: CreateUser):
    await db.execute(insert(User).values(first_name=create_user.first_name,
                                         last_name=create_user.last_name,
                                         username=create_user.username,
                                         email=create_user.email,
                                         hashed_password=bcrypt_context.hash(create_user.password),
                                         ))
    await db.commit()
    return {
        'status_code': status.HTTP_201_CREATED,
        'transaction': 'Successful'
    }

И приступим к реализации OAuth2 авторизации. В файле routers/auth.py добавим следующую функцию:

async def authanticate_user(db: Annotated[AsyncSession, Depends(get_db)], username: str, password: str):
    user = await db.scalar(select(User).where(User.username == username))
    if not user or not bcrypt_context.verify(password, user.hashed_password) or user.is_active == False:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

authanticate_user() — это внедряемая функция, которая будет получать имя пользователя и хэш пароля, и проверять их. Она почти не отличается от функции get_current_username из прошлого раздела.

Использование OAuth2PasswordBearer и OAuth2PasswordRequestForm

FastAPI поддерживает OAuth2, особенно тип потока паролей спецификации OAuth2. Его модуль fastapi.security имеет OAuth2PasswordBearer, который служит поставщиком аутентификации на основе пароля. Он также имеет OAuth2PasswordRequestForm, который может объявлять тело формы с обязательными параметрами, именем пользователя и паролем, а также некоторыми дополнительными параметрами, такими как scope, grant_type, client_id и client_secret.

Этот класс вводится непосредственно в конечную точку API для извлечения всех значений параметров из формы входа в браузере. Итак, давайте начнем с создания OAuth2PasswordBearer, который будет внедрен в зависимость нашей конечной точки, которая будет проверять пользователя, добавим следующий код в файл routers/auth.py:

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")


@router.post('/token')
async def login(db: Annotated[AsyncSession, Depends(get_db)], form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    user = await authanticate_user(db, form_data.username, form_data.password)

    if not user or user.is_active == False:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail='Could not validate user'
        )

    return {
        'access_token': user.username,
        'token_type': 'bearer'
    }

Функция login() получает из данные логина и пароля, и через функцию authanticate_user проверяет равно ли значение полученного пароля хэшу пароля из БД запрошенной учетной записи. Если все проверки пройдены успешно, конечная точка должна вернуть объект JSON с необходимыми свойствами access_token и token_type.

Продолжим в следующем шаге.


  • Комментария
Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.

@router.post('/token')

async def login(db: Annotated[AsyncSession, Depends(get_db)], form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):

    user = await authanticate_user(db, form_data.username, form_data.password)

    if not user or user.is_active == False:

        raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail='Could not validate user' )

    return { 'access_token': user.username, 'token_type': 'bearer' }

1) Доброго дня коллеги! Нам точно нужно в функции login ссылаться "депенсом" на пустоту?

2) Мы активность учетки проверяем в функции authanticate_user, нам точно надо еще раз это делать?

3) В пункте "Использование OAuth2PasswordBearer и OAuth2PasswordRequestForm" не очевидно, что добавлять записи нужно в файле auth.py, может добавить?

Изменен Алексей Бойко

@Алексей_Бойко,

1) Да, все верно, именно так и используется OAuth2PasswordRequestForm в документации. Если не использовать аннотации, то будет выглядеть как form_data: OAuth2PasswordRequestForm = Depends()

2) Да, можно убрать эту проверку из конечной точки.

3) Спасибо, добавил.

user = await authanticate_user(db, form_data.username, form_data.password) не найден authanticate_user,как ты куда поставил его?

@Anonymous_538798485, все понял!исправил его функцию