from fastapi import FastAPI, Depends, HTTPException, status, Request from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from datetime import datetime, timedelta, timezone from jose import jwt import hashlib from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from app.db import models from dotenv import load_dotenv from jose import JWTError, jwt import os import bcrypt from sqlalchemy.exc import IntegrityError from typing import Optional load_dotenv() SECRET_KEY = os.getenv("JWT_KEY") if not SECRET_KEY: raise RuntimeError("JWT_KEY environment variable not set") SECRET_KEY = SECRET_KEY.strip() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 60 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/login-oauth", description=( "### Инструкция по авторизации\n\n" "1. Введите ваш **username** и **password**.\n" "2. В поле **client_secret** введите текущий **6-значный код TOTP**, если он подключен\n" "3. Поле *client_id* оставьте пустым." )) # бд async def get_db(): async with models.AsyncSessionLocal() as db: try: yield db finally: await db.close() def verify_password(plain_password, hashed_password): try: return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password) except TypeError: return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) def get_password_hash(password): return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) def create_access_token(data: dict, session_id: int = None): to_encode = data.copy() if session_id: to_encode.update({"session_id": session_id}) expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": int(expire.timestamp())}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(data: dict, session_id: int = None): to_encode = data.copy() if session_id: to_encode.update({"session_id": session_id}) expire = datetime.now(timezone.utc) + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": int(expire.timestamp())}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def _parse_device_name(request: Request) -> str: device_name = request.headers.get("x-device-name") if device_name: return device_name user_agent = request.headers.get("user-agent", "").lower() if "android" in user_agent: return "Android Device" elif "iphone" in user_agent or "ipad" in user_agent: return "iOS Device" elif "windows" in user_agent: return "Windows PC" elif "macintosh" in user_agent: return "macOS Device" elif "linux" in user_agent: return "Linux PC" return "Неизвестное устройство" # ПРОВЕРКА ТОКЕНА ДЛЯ HTTP ЗАПРОСОВ async def get_current_user( request: Request, token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db) ): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") session_id: Optional[int] = payload.get( "session_id") # <-- Читаем ID сессии if user_id is None: raise credentials_exception if db is not None: if session_id: # === СТРОГИЙ РЕЖИМ ДЛЯ НОВЫХ КЛИЕНТОВ === session_result = await db.execute( select(models.Session).where( models.Session.id == int(session_id)) ) session = session_result.scalars().first() if session is None: # Если сессия была удалена (например, через Logout на другом устройстве) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Сессия отозвана") session.last_active = datetime.now(timezone.utc) await db.commit() else: # === РЕЖИМ СОВМЕСТИМОСТИ ДЛЯ СТАРЫХ КЛИЕНТОВ === session_result = await db.execute( select(models.Session).where(models.Session.session_token == token) ) session = session_result.scalars().first() if session is None: # ИЩЕМ УЖЕ СУЩЕСТВУЮЩУЮ ЗАГЛУШКУ ЭТОГО ПОЛЬЗОВАТЕЛЯ, ЧТОБЫ НЕ ПЛОДИТЬ НОВЫЕ existing_legacy_result = await db.execute( select(models.Session).where( models.Session.user_id == int(user_id), models.Session.ip_address == "0.0.0.0" # Признак нашей заглушки ) ) existing_legacy = existing_legacy_result.scalars().first() if existing_legacy: # Просто обновляем старую заглушку (перезаписываем токен и время) existing_legacy.session_token = token existing_legacy.last_active = datetime.now(timezone.utc) await db.commit() else: # Если заглушки еще нет вообще, создаем ОДИН раз session = models.Session( user_id=int(user_id), session_token=token, device_name="Старая версия приложения", ip_address="0.0.0.0", created_at=datetime.now(timezone.utc), last_active=datetime.now(timezone.utc) ) db.add(session) await db.commit() else: session.last_active = datetime.now(timezone.utc) await db.commit() result = await db.execute(select(models.User).where(models.User.id == user_id)) user = result.scalars().first() if user is None: raise credentials_exception if getattr(user, "is_blocked", 0) == 1: raise HTTPException(status_code=403, detail="Ваш аккаунт заблокирован") return user except JWTError: raise credentials_exception # ПРОВЕРКА ТОКЕНА ДЛЯ ВЕБСОКЕТОВ async def test_token(token: str, db: AsyncSession = None): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") session_id: Optional[int] = payload.get("session_id") # <-- Читаем ID сессии if user_id is None: raise credentials_exception if db is not None: if session_id: # === СТРОГИЙ РЕЖИМ ДЛЯ НОВЫХ КЛИЕНТОВ (ДОБАВИЛИ СЮДА) === session_result = await db.execute( select(models.Session).where(models.Session.id == int(session_id)) ) session = session_result.scalars().first() if session is None: # Если сессия была удалена (например, через Logout с другого устройства) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Сессия отозвана" ) session.last_active = datetime.now(timezone.utc) await db.commit() else: # === РЕЖИМ СОВМЕСТИМОСТИ ДЛЯ СТАРЫХ КЛИЕНТОВ === session_result = await db.execute( select(models.Session).where(models.Session.session_token == token) ) session = session_result.scalars().first() if session is None: # ИЩЕМ УЖЕ СУЩЕСТВУЮЩУЮ ЗАГЛУШКУ ЭТОГО ПОЛЬЗОВАТЕЛЯ, ЧТОБЫ НЕ ПЛОДИТЬ НОВЫЕ existing_legacy_result = await db.execute( select(models.Session).where( models.Session.user_id == int(user_id), models.Session.ip_address == "0.0.0.0" # Признак нашей заглушки ) ) existing_legacy = existing_legacy_result.scalars().first() if existing_legacy: # Просто обновляем старую заглушку (перезаписываем токен и время) existing_legacy.session_token = token existing_legacy.last_active = datetime.now(timezone.utc) await db.commit() else: # Если заглушки еще нет вообще, создаем ОДИН раз session = models.Session( user_id=int(user_id), session_token=token, device_name="Старая версия приложения", ip_address="0.0.0.0", created_at=datetime.now(timezone.utc), last_active=datetime.now(timezone.utc) ) db.add(session) await db.commit() else: session.last_active = datetime.now(timezone.utc) await db.commit() return user_id, session_id except JWTError: raise credentials_exception