В отличие от предыдущей схемах OAuth2, мы будем внедрять зависимость get_current_user() в каждую службу API, чтобы обеспечить безопасность и ограничить доступ. Внедренный экземпляр OAuth2PasswordBearer вернет JWT для извлечения полезных данных с использованием декодеров JOSE с указанным алгоритмом декодирования. Если токен подделан, изменен или срок его действия истек, метод вызовет исключение.
Напишем эту функцию в файле routers/auth.py:
from jose import jwt, JWTError
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get('sub')
user_id: int = payload.get('id')
is_admin: str = payload.get('is_admin')
is_supplier: str = payload.get('is_supplier')
is_customer: str = payload.get('is_customer')
expire = payload.get('exp')
if username is None or user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate user'
)
if expire is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No access token supplied"
)
if datetime.now() > datetime.fromtimestamp(expire):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Token expired!"
)
return {
'username': username,
'id': user_id,
'is_admin': is_admin,
'is_supplier': is_supplier,
'is_customer': is_customer,
}
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Could not validate user'
)
В этом коде, мы декодируем полученный токен в словарь payload, используя секретный ключ и алгоритм. Далее мы извлекаем содержимое ключей словаря, и запускаем проверки. Сначала функция проверяет ключи username и user_id. Если содержимое данных ключей не найдено, то вызываем ошибку валидации пользователя. Далее функция проверяет срок действия токена. Если срок действия не указан, значит, токен не был предоставлен. Следующая проверка — валидность токена — генерируется исключение, информирующее пользователя об истечении срока действия токена. Если токен действителен, возвращается декодированная полезная нагрузка.
В блоке except для любой ошибки JWT выдается исключение неверного запроса.
Теперь, когда мы реализовали функции для создания и проверки токенов, отправляемых в приложение, давайте создадим функцию, которая проверяет аутентификацию пользователя и служит зависимостью.
get_current_user() должен быть внедрен в каждую реализацию сервиса, чтобы ограничить доступ пользователей. Но на этот раз метод не только проверит учетные данные, но и выполнит декодирование полезной нагрузки JWT.
@router.get('/read_current_user')
async def read_current_user(user: User = Depends(get_current_user)):
return {'User': user}
Запустим сервер и проверим работу. Нажмем на кнопку «Authorize» в правом верхнем углу документации и заполним форму входа. Далее перейдем к конечной точке read_current_user и попробуем отправить запрос.
Обратите внимание, в заголовке запроса у нас присутствует сам код токена. И в теле ответа увидим все полученные данные из токена. В случае если срок действия токена закончился, мы увидим сообщение об ошибке:
Теперь, когда мы успешно создали зависимость для защиты маршрутов, давайте обновим права доступа в наших маршрутах, а также добавим функцию аутентификации для них.