from fastapi import HTTPException, status, APIRouter, WebSocket, WebSocketDisconnect, Query, Depends from app.core.security import test_token from typing import Dict from datetime import datetime, timezone import json import asyncio from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, update, func from app.db import models from firebase_admin import messaging, credentials, exceptions import firebase_admin from app.core.config import config import uuid cred = credentials.Certificate(config.FIREBASE_CREDENTIALS_PATH) firebase_admin.initialize_app(cred) # Асинхронный генератор сессий БД async def get_db(): # Убедитесь, что в models у вас настроен асинхронный SessionLocal async with models.AsyncSessionLocal() as db: try: yield db finally: await db.close() wsRouter = APIRouter( prefix='/ws' ) @wsRouter.websocket("") async def websocket_endpoint(websocket: WebSocket, token: str = Query(None), db: AsyncSession = Depends(get_db)): if token is None: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return try: user_id, session_id = await test_token(token=token, db=db) user_id = int(user_id) except HTTPException: await websocket.close(code=status.WS_1008_POLICY_VIOLATION) return print("ПОДКЛЮЧЕНИЕ") await manager.connect(websocket, user_id, session_id) print("ПОДКЛЮЧЕНО") # АсинOverlay обновление статуса онлайн await db.execute( update(models.User) .where(models.User.id == user_id) .values(last_online=datetime.now(timezone.utc)) ) await db.commit() await manager.broadcast({ "type": "user_online", "user_id": user_id, }) try: while True: print("ОЖИДАНИЕ СООБЩЕНИЙ") data = await websocket.receive_text() message_data = json.loads(data) print(f"DEBUG: Получены данные: {message_data}") # Обновляем last_online при любой активности await db.execute( update(models.User) .where(models.User.id == user_id) .values(last_online=datetime.now(timezone.utc)) ) await db.commit() if message_data.get("type") == "set_online": # Передаем session_id в метод менеджера status_changed = await manager.set_online(user_id, session_id) # Отправляем бродкаст ТОЛЬКО если пользователь до этого был полностью оффлайн if status_changed: await manager.broadcast({ "type": "user_online", "user_id": user_id, }) continue if message_data.get("type") == "set_offline": # Передаем session_id в метод менеджера status_changed = await manager.set_offline(user_id, session_id) # Отправляем бродкаст ТОЛЬКО если закрылась ПОСЛЕДНЯЯ активная сессия if status_changed: await manager.broadcast({ "type": "user_offline", "user_id": user_id, }) continue if message_data.get("type") == "private_message": # Получаем отправителя асинхронно user_res = await db.execute(select(models.User).where(models.User.id == user_id)) user = user_res.scalars().first() receiver_id = message_data.get("receiver_id") temp_id = message_data.get("temp_id") content = message_data.get("content") content50 = message_data.get("content50") message_type = message_data.get("message_type") or "text" file_id = message_data.get("file_id") encrypted_key = message_data.get("encrypted_key") print( f"DEBUG private_message payload: temp_id={temp_id}, receiver_id={receiver_id}") if receiver_id is None or content is None: await websocket.send_json({ "type": "error", "detail": "receiver_id/content required", }) continue try: receiver_id = int(receiver_id) except (TypeError, ValueError): await websocket.send_json({ "type": "error", "detail": "receiver_id must be int", }) continue new_msg = models.Message( sender_id=user_id, receiver_id=receiver_id, content=content, message_type=message_type, file_id=file_id, encrypted_key=encrypted_key, reply_to_id=message_data.get("reply_to_id"), reply_to_text=message_data.get("reply_to_text") ) db.add(new_msg) await db.commit() await db.refresh(new_msg) print(f"DEBUG saved message: id={new_msg.id}") user = await db.merge(user) session_res = await db.execute(select(models.Session).where(models.Session.user_id == receiver_id)) sessions = session_res.scalars().all() unread_res = await db.execute( select(func.count(models.Message.id)) .where(models.Message.receiver_id == receiver_id, models.Message.read_at == None) ) unread_count = unread_res.scalar() or 0 # ACK отправителю print("Отправляем ACK отправителю") await manager.send_personal_message({ "type": "message_sent", "temp_id": temp_id, "server_id": new_msg.id, "sender_id": user_id, "receiver_id": receiver_id, "content": content, "message_type": message_type, "file_id": file_id, "encrypted_key": encrypted_key, "reply_to_id": new_msg.reply_to_id, "reply_to_text": new_msg.reply_to_text, "unread_count": unread_count, "timestamp": (new_msg.timestamp or datetime.utcnow()).isoformat(), }, str(user_id)) # Формируем пакет для получателя outgoing_message = { "id": new_msg.id, "type": "private_message", "sender_id": user_id, "receiver_id": receiver_id, "content": content, "message_type": message_type, "file_id": file_id, "encrypted_key": encrypted_key, "timestamp": (new_msg.timestamp or datetime.utcnow()).isoformat(), "reply_to_id": new_msg.reply_to_id, "reply_to_text": new_msg.reply_to_text, "unread_count": unread_count, } _sent_to_receiver = False for session in sessions: if manager.active_connections.get(str(receiver_id), {}).get(session.id): print( f"DEBUG: Пользователь {receiver_id} онлайн в сессии {session.id}, отправляем через WebSocket") sent_to_receiver = await manager.send_personal_message_to_session(outgoing_message, str(receiver_id), session.id) if sent_to_receiver: _sent_to_receiver = True print( f"DEBUG: Сообщение доставлено по сокету пользователю {receiver_id} в сессии {session.id}") else: if session.fcm_token: _sent_to_receiver = True send_fcm_notification( token=session.fcm_token, user_id=user.id, username=user.username, firstname=user.first_name, lastname=user.last_name, public_key=user.public_key, encrypted_text=content50, timestamp=new_msg.timestamp or datetime.utcnow(), unread_count=str(unread_count), message_id=str(new_msg.id) ) print( f"DEBUG: Сообщение не доставлено по сокету, отправлено FCM уведомление пользователю {receiver_id} сессия {session.id} токен {session.fcm_token}") # Если успешно доставлено по сокету — отмечаем в БД if _sent_to_receiver: try: delivered_at = datetime.utcnow() new_msg.delivered_at = delivered_at await db.commit() await manager.send_personal_message({ "type": "message_delivered", "message_id": new_msg.id, "timestamp": delivered_at.isoformat(), }, str(user_id)) except Exception as e: print(f"Ошибка при сохранении статуса доставки: {e}") await db.rollback() elif message_data.get("type") == "edit_message": message_id = message_data.get("message_id") content = message_data.get("content") if message_id is None or content is None: await websocket.send_json({"type": "error", "detail": "message_id/content required"}) continue try: message_id = int(message_id) except (TypeError, ValueError): await websocket.send_json({"type": "error", "detail": "message_id must be int"}) continue msg_res = await db.execute(select(models.Message).where(models.Message.id == message_id)) msg = msg_res.scalars().first() if msg is None or msg.sender_id != user_id: continue try: msg.content = content msg.edited_at = datetime.utcnow() await db.commit() except Exception as e: print(f"Ошибка редактирования: {e}") await db.rollback() continue event = { "type": "message_edited", "message_id": msg.id, "sender_id": msg.sender_id, "content": msg.content, "edited_at": msg.edited_at.isoformat() if msg.edited_at else None, } await manager.send_personal_message(event, str(msg.receiver_id)) await manager.send_personal_message(event, str(msg.sender_id)) elif message_data.get("type") == "delete_message": message_id = message_data.get("message_id") if message_id is None: await websocket.send_json({"type": "error", "detail": "message_id required"}) continue try: message_id = int(message_id) except (TypeError, ValueError): await websocket.send_json({"type": "error", "detail": "message_id must be int"}) continue msg_res = await db.execute(select(models.Message).where(models.Message.id == message_id)) msg = msg_res.scalars().first() if msg is None or msg.sender_id != user_id: continue receiver_id = msg.receiver_id try: await db.delete(msg) await db.commit() except Exception as e: print(f"Ошибка удаления: {e}") await db.rollback() continue event = { "type": "message_deleted", "message_id": message_id, } await manager.send_personal_message(event, str(receiver_id)) await manager.send_personal_message(event, str(user_id)) elif message_data.get("type") == "read_receipt": message_id = message_data.get("message_id") try: message_id = int(message_id) except (TypeError, ValueError): continue msg_res = await db.execute(select(models.Message).where(models.Message.id == message_id)) msg = msg_res.scalars().first() if msg is None: continue if int(msg.receiver_id) != int(user_id): continue try: read_at = datetime.utcnow() msg.read_at = read_at await db.commit() except Exception as e: print(f"Ошибка прочтения: {e}") await db.rollback() continue unread_res = await db.execute( select(func.count(models.Message.id)) .where(models.Message.receiver_id == user_id, models.Message.read_at == None) ) unread_count = unread_res.scalar() or 0 sender_id = int(msg.sender_id) await manager.send_personal_message({ "type": "message_read", "sender_id": sender_id, "receiver_id": user_id, "message_id": message_id, "unread_count": unread_count, "timestamp": read_at.isoformat(), }, str(sender_id)) await manager.send_personal_message({ "type": "message_read", "sender_id": sender_id, "receiver_id": user_id, "message_id": message_id, "unread_count": unread_count, "timestamp": read_at.isoformat(), }, str(user_id)) elif message_data.get("type") == "read_all_chat": contact_id = message_data.get("contact_id") try: contact_id = int(contact_id) except (TypeError, ValueError): continue try: read_at_time = datetime.utcnow() await db.execute( update(models.Message) .where( (models.Message.receiver_id == user_id) & (models.Message.sender_id == contact_id) & (models.Message.read_at.is_(None)) ) .values(read_at=read_at_time) ) await db.commit() await manager.send_personal_message({ "type": "all_chat_read", "sender_id": sender_id, "reader_id": user_id, "timestamp": read_at_time.isoformat(), }, str(contact_id)) await manager.send_personal_message({ "type": "all_chat_read", "sender_id": sender_id, "reader_id": user_id, "timestamp": read_at_time.isoformat(), }, str(user_id)) except Exception as e: print(f"Ошибка массового прочтения: {e}") await db.rollback() continue elif message_data.get("type") == "typing": receiver_id = message_data.get("receiver_id") if receiver_id is None: continue try: receiver_id = int(receiver_id) except (TypeError, ValueError): await websocket.send_json({ "type": "error", "detail": "receiver_id must be int", }) continue await manager.send_personal_message({"type": "typing", "sender_id": user_id}, str(receiver_id)) elif message_data.get("type") == "stop_typing": receiver_id = message_data.get("receiver_id") if receiver_id is None: continue try: receiver_id = int(receiver_id) except (TypeError, ValueError): await websocket.send_json({ "type": "error", "detail": "receiver_id must be int", }) continue await manager.send_personal_message({"type": "stop_typing", "sender_id": user_id}, str(receiver_id)) if message_data.get("type") == "call_init": receiver_id = message_data.get("receiver_id") call_id = str(uuid.uuid4()) call_data = { "type": "call_init", "call_id": call_id, "caller_username": message_data.get("caller_username"), "caller_id": str(user_id) } sent = await manager.send_personal_message(call_data, str(receiver_id)) if sent: await manager.send_personal_message({"type": "call_created", "call_id": call_id}, str(user_id)) elif message_data.get("type") in ["offer", "answer", "ice_candidate", "hangup", "decline"]: receiver_id = message_data.get("receiver_id") await manager.send_personal_message(message_data, str(receiver_id)) except WebSocketDisconnect: pass finally: await manager.disconnect(user_id, session_id) try: # Асинхронно ставим статус оффлайн при выходе await db.execute( update(models.User) .where(models.User.id == user_id) .values(last_online=datetime.now(timezone.utc)) ) await db.commit() except Exception as e: print(f"Ошибка обновления статуса при отключении: {e}") print("ОТКЛЮЧЕНИЕ") def send_fcm_notification(token, user_id, username, firstname, lastname, public_key, encrypted_text, timestamp, unread_count='1', message_id='0'): print(f"DEBUG: Отправляем FCM уведомление пользователю {user_id}") message = messaging.Message( data={ "type": "enc_message", "sender_id": str(user_id), "username": username, "firstname": firstname, "lastname": lastname, "public_key": public_key, "content": encrypted_text, "timestamp": timestamp.isoformat(), "unread_count": str(unread_count), "message_id": str(message_id), }, android=messaging.AndroidConfig(priority='high'), token=token, ) try: response = messaging.send(message) print('Successfully sent message:', response) except Exception as e: print('Unexpected error sending push:', e) class ConnectionManager: def __init__(self): self.active_connections: Dict[str, Dict[int, WebSocket]] = {} self.online_users: Dict[str, datetime] = {} async def connect(self, websocket: WebSocket, user_id: int, session_id: int): await websocket.accept() user_key = str(user_id) if user_key not in self.active_connections: self.active_connections[user_key] = {} self.active_connections[user_key][session_id] = websocket # Логика онлайна: проверяем, первый ли это вход пользователя в сеть is_first_session = user_key not in self.online_users if is_first_session: self.online_users[user_key] = {} self.online_users[user_key][session_id] = datetime.now(timezone.utc) # Если статус изменился на онлайн, сразу делаем рассылку if is_first_session: await self.broadcast({ "type": "user_online", "user_id": user_id, }) async def disconnect(self, user_id: int, session_id: int): user_key = str(user_id) # 1. Удаляем соединение из active_connections if user_key in self.active_connections: self.active_connections[user_key].pop(session_id, None) if not self.active_connections[user_key]: self.active_connections.pop(user_key, None) # 2. Удаляем сессию из online_users if user_key in self.online_users: self.online_users[user_key].pop(session_id, None) # Если это была последняя активная сессия пользователя if not self.online_users[user_key]: self.online_users.pop(user_key, None) # Отправляем бродкаст: пользователь полностью ушел в офлайн await self.broadcast({ "type": "user_offline", "user_id": user_id, }) async def send_personal_message(self, message: dict, user_id: str) -> bool: user_key = str(user_id) if user_key in self.active_connections: success = False for session_id, websocket in list(self.active_connections[user_key].items()): try: print( f"DEBUG: Отправляем сообщение пользователю {user_id} в сессии {session_id} через WebSocket") await websocket.send_json(message) success = True except Exception: await self.disconnect(int(user_id), session_id) return success return False async def send_personal_message_to_session(self, message: dict, user_id: str, session_id: int) -> bool: user_key = str(user_id) if user_key in self.active_connections: if session_id in self.active_connections[user_key]: try: await self.active_connections[user_key][session_id].send_json(message) return True except Exception: await self.disconnect(int(user_id), session_id) return False async def kill_session_socket(self, user_id: int, session_id: int): user_key = str(user_id) if user_key in self.active_connections: if session_id in self.active_connections[user_key]: ws = self.active_connections[user_key][session_id] try: await ws.send_json({"type": "session_terminated"}) await asyncio.sleep(0.1) await ws.close(code=status.WS_1008_POLICY_VIOLATION) except Exception as e: print(f"Ошибка при закрытии сокета: {e}") try: del self.active_connections[user_key][session_id] except Exception as e: print(f"Ошибка при удалении сокета: {e}") async def broadcast(self, message: dict): for connection in self.active_connections.values(): for websocket in connection.values(): try: await websocket.send_json(message) except Exception: pass async def set_online(self, user_id: str, session_id: int) -> bool: u_id = str(user_id) if u_id not in self.active_connections: return False # Если пользователя вообще не было в онлайн-списке, значит статус МЕНЯЕТСЯ is_first_session = u_id not in self.online_users if is_first_session: self.online_users[u_id] = {} self.online_users[u_id][session_id] = datetime.now(timezone.utc) return is_first_session # True, если юзер стал онлайн глобально async def set_offline(self, user_id: str, session_id: int) -> bool: u_id = str(user_id) if u_id not in self.online_users: return False # Удаляем текущую сессию self.online_users[u_id].pop(session_id, None) # Если после удаления этой сессии у юзера больше НЕТ других сессий if not self.online_users[u_id]: del self.online_users[u_id] return True # Статус ИЗМЕНИЛСЯ, пользователь полностью ушел в оффлайн return False # Сессия закрыта, но юзер всё еще онлайн с другого устройства manager = ConnectionManager()