5.12 Управление правами доступа в FastAPI
5 из 5 шагов пройдено

Управление правами доступа

Большинство приложений требуют определенного уровня управления доступом. В данном разделе мы реализуем простейшую возможность управлять правами доступа к нашим конечным точкам.

Как мы уже помним из раздела Реализации аутентификации, у модели User есть следующие поля:

  •     is_admin - Поле хранящее булево значение, значение True будет использоваться только для администратора, чтобы предоставить доступ к некоторым разделам API. По умолчанию значение False.
  •     is_supplier - Поле хранящее булево значение, значение True будет использоваться только для продавцов товара, чтобы предоставить доступ к некоторым разделам API, например как добавление товара. По умолчанию значение False.
  •     is_customer - Поле хранящее булево значение, значение True будет использоваться только для покупателей товара. По умолчанию значение True.

И именно их мы передаем в JWT токене, чтобы не обращаться к БД при каждом запросе. Разберем основные права, которые мы реализуем.

Рассмотрим данную систему подробнее. Первым делом изменим права в базе данных у созданного нами пользователя в разделе 5.10 Реализация аутентификации используя HTTP Basic Auth

Мы изменим следующие поля:

  • is_admin = True, наш пользователь имеет права администратора
  • is_supplier = False, наш пользователь не продавец
  • is_customer = False, наш пользователь не покупатель

Теперь давайте создадим конечную точку, которую мы будем использовать администратором, только для изменения прав доступа пользователей и их удаления. В папке routers создадим файл permission.py и приступим к написанию кода:

from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select, update
from starlette import status
from .auth import get_current_user
from app.models.user import User
from sqlalchemy.orm import Session
from app.backend.db_depends import get_db
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix='/permission', tags=['permission'])


@router.patch('/')
async def supplier_permission(db: Annotated[AsyncSession, Depends(get_db)], get_user: Annotated[dict, Depends(get_current_user)],
                          user_id: int):
    if get_user.get('is_admin'):
        user = await db.scalar(select(User).where(User.id == user_id))

        if not user:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail='User not found'
            )
        if user.is_supplier:
            await db.execute(update(User).where(User.id == user_id).values(is_supplier=False, is_customer=True))
            await db.commit()
            return {
                'status_code': status.HTTP_200_OK,
                'detail': 'User is no longer supplier'
            }
        else:
            await db.execute(update(User).where(User.id == user_id).values(is_supplier=True, is_customer=False))
            await db.commit()
            return {
                'status_code': status.HTTP_200_OK,
                'detail': 'User is now supplier'
            }
    else:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="You don't have admin permission"
        )

Разберем данный код подробно. Мы подключаем зависимости от сессий БД, и от функции get_current_user(), которая декодирует наш JWT токен, извлекая из него следующие параметры:

'username': username,
'id': user_id,
'is_admin': is_admin,
'is_supplier': is_supplier,
'is_customer': is_customer,

Далее мы создаем условие if get_user.get('is_admin'), то есть если у пользователя, обратившегося к данной конечной точке, поле is_admin возвращается как True, то значит он может выполнять основную логику конечной точки. Иначе вызываем ошибку 401 с текстом, что у пользователя нет прав администратора.

Если наше условие истинное, то мы получаем пользователя с user_id из параметра запроса: user = await db.scalar(select(User).where(User.id == user_id)). В случае если пользователь не найден, вызываем 404 ошибку с соответствующим текстом.

Далее хотелось бы остановиться на этом коде:

        if user.is_supplier:
            await db.execute(update(User).where(User.id == user_id).values(is_supplier=False, is_customer=True))
            await db.commit()
            return {
                'status_code': status.HTTP_200_OK,
                'detail': 'User is no longer supplier'
            }
        else:
            await db.execute(update(User).where(User.id == user_id).values(is_supplier=True, is_customer=False))
            await db.commit()
            return {
                'status_code': status.HTTP_200_OK,
                'detail': 'User is now supplier'
            }

На самом деле все просто

  • Если у пользователя с введенным user_id, поле is_supplier = True, то мы устанавливаем ему значение False, чтобы убрать права доступа продавца и назначить права покупателя is_customer=True
  • Если поле is_supplier = False, то устанавливаем ему значение True, чтобы назначить пользователя как продавца и убрать права покупателя is_customer=False

В обоих случаях возвращаем 200 код ответа, и текстом соответствующим действиям.

Не забываем подключить маршруты в main.py:

from app.routers import permission

app.include_router(permission.router)

В следующем шаге перейдем к тестированию работы. 


Будьте вежливы и соблюдайте наши принципы сообщества. Пожалуйста, не оставляйте решения и подсказки в комментариях, для этого есть отдельный форум.
Нет обсуждений. Начните первое.