Chepuhagram/srv/app/core/security.py

246 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta, timezone
from jose import jwt
import hashlib
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db import models
from dotenv import load_dotenv
from jose import JWTError, jwt
import os
import bcrypt
from sqlalchemy.exc import IntegrityError
from typing import Optional
load_dotenv()
SECRET_KEY = os.getenv("JWT_KEY")
if not SECRET_KEY:
raise RuntimeError("JWT_KEY environment variable not set")
SECRET_KEY = SECRET_KEY.strip()
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 60
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login-oauth", description=(
"### Инструкция по авторизации\n\n"
"1. Введите ваш **username** и **password**.\n"
"2. В поле **client_secret** введите текущий **6-значный код TOTP**, если он подключен\n"
"3. Поле *client_id* оставьте пустым."
))
# бд
async def get_db():
async with models.AsyncSessionLocal() as db:
try:
yield db
finally:
await db.close()
def verify_password(plain_password, hashed_password):
try:
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password)
except TypeError:
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
def get_password_hash(password):
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
def create_access_token(data: dict, session_id: int = None):
to_encode = data.copy()
if session_id:
to_encode.update({"session_id": session_id})
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": int(expire.timestamp())})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict, session_id: int = None):
to_encode = data.copy()
if session_id:
to_encode.update({"session_id": session_id})
expire = datetime.now(timezone.utc) + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": int(expire.timestamp())})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def _parse_device_name(request: Request) -> str:
device_name = request.headers.get("x-device-name")
if device_name:
return device_name
user_agent = request.headers.get("user-agent", "").lower()
if "android" in user_agent:
return "Android Device"
elif "iphone" in user_agent or "ipad" in user_agent:
return "iOS Device"
elif "windows" in user_agent:
return "Windows PC"
elif "macintosh" in user_agent:
return "macOS Device"
elif "linux" in user_agent:
return "Linux PC"
return "Неизвестное устройство"
# ПРОВЕРКА ТОКЕНА ДЛЯ HTTP ЗАПРОСОВ
async def get_current_user(
request: Request,
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
session_id: Optional[int] = payload.get(
"session_id") # <-- Читаем ID сессии
if user_id is None:
raise credentials_exception
if db is not None:
if session_id:
# === СТРОГИЙ РЕЖИМ ДЛЯ НОВЫХ КЛИЕНТОВ ===
session_result = await db.execute(
select(models.Session).where(
models.Session.id == int(session_id))
)
session = session_result.scalars().first()
if session is None:
# Если сессия была удалена (например, через Logout на другом устройстве)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Сессия отозвана")
session.last_active = datetime.now(timezone.utc)
await db.commit()
else:
# === РЕЖИМ СОВМЕСТИМОСТИ ДЛЯ СТАРЫХ КЛИЕНТОВ ===
session_result = await db.execute(
select(models.Session).where(models.Session.session_token == token)
)
session = session_result.scalars().first()
if session is None:
# ИЩЕМ УЖЕ СУЩЕСТВУЮЩУЮ ЗАГЛУШКУ ЭТОГО ПОЛЬЗОВАТЕЛЯ, ЧТОБЫ НЕ ПЛОДИТЬ НОВЫЕ
existing_legacy_result = await db.execute(
select(models.Session).where(
models.Session.user_id == int(user_id),
models.Session.ip_address == "0.0.0.0" # Признак нашей заглушки
)
)
existing_legacy = existing_legacy_result.scalars().first()
if existing_legacy:
# Просто обновляем старую заглушку (перезаписываем токен и время)
existing_legacy.session_token = token
existing_legacy.last_active = datetime.now(timezone.utc)
await db.commit()
else:
# Если заглушки еще нет вообще, создаем ОДИН раз
session = models.Session(
user_id=int(user_id),
session_token=token,
device_name="Старая версия приложения",
ip_address="0.0.0.0",
created_at=datetime.now(timezone.utc),
last_active=datetime.now(timezone.utc)
)
db.add(session)
await db.commit()
else:
session.last_active = datetime.now(timezone.utc)
await db.commit()
result = await db.execute(select(models.User).where(models.User.id == user_id))
user = result.scalars().first()
if user is None:
raise credentials_exception
if getattr(user, "is_blocked", 0) == 1:
raise HTTPException(status_code=403, detail="Ваш аккаунт заблокирован")
return user
except JWTError:
raise credentials_exception
# ПРОВЕРКА ТОКЕНА ДЛЯ ВЕБСОКЕТОВ
async def test_token(token: str, db: AsyncSession = None):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
session_id: Optional[int] = payload.get("session_id") # <-- Читаем ID сессии
if user_id is None:
raise credentials_exception
if db is not None:
if session_id:
# === СТРОГИЙ РЕЖИМ ДЛЯ НОВЫХ КЛИЕНТОВ (ДОБАВИЛИ СЮДА) ===
session_result = await db.execute(
select(models.Session).where(models.Session.id == int(session_id))
)
session = session_result.scalars().first()
if session is None:
# Если сессия была удалена (например, через Logout с другого устройства)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Сессия отозвана"
)
session.last_active = datetime.now(timezone.utc)
await db.commit()
else:
# === РЕЖИМ СОВМЕСТИМОСТИ ДЛЯ СТАРЫХ КЛИЕНТОВ ===
session_result = await db.execute(
select(models.Session).where(models.Session.session_token == token)
)
session = session_result.scalars().first()
if session is None:
# ИЩЕМ УЖЕ СУЩЕСТВУЮЩУЮ ЗАГЛУШКУ ЭТОГО ПОЛЬЗОВАТЕЛЯ, ЧТОБЫ НЕ ПЛОДИТЬ НОВЫЕ
existing_legacy_result = await db.execute(
select(models.Session).where(
models.Session.user_id == int(user_id),
models.Session.ip_address == "0.0.0.0" # Признак нашей заглушки
)
)
existing_legacy = existing_legacy_result.scalars().first()
if existing_legacy:
# Просто обновляем старую заглушку (перезаписываем токен и время)
existing_legacy.session_token = token
existing_legacy.last_active = datetime.now(timezone.utc)
await db.commit()
else:
# Если заглушки еще нет вообще, создаем ОДИН раз
session = models.Session(
user_id=int(user_id),
session_token=token,
device_name="Старая версия приложения",
ip_address="0.0.0.0",
created_at=datetime.now(timezone.utc),
last_active=datetime.now(timezone.utc)
)
db.add(session)
await db.commit()
else:
session.last_active = datetime.now(timezone.utc)
await db.commit()
return user_id, session_id
except JWTError:
raise credentials_exception