Chepuhagram/srv/app/websocket/connection_manager.py

651 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import HTTPException, status, APIRouter, WebSocket, WebSocketDisconnect, Query, Depends
from app.core.security import test_token
from typing import Dict
from datetime import datetime, timezone
import json
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, func
from app.db import models
from firebase_admin import messaging, credentials, exceptions
import firebase_admin
from app.core.config import config
import uuid
cred = credentials.Certificate(config.FIREBASE_CREDENTIALS_PATH)
firebase_admin.initialize_app(cred)
# Асинхронный генератор сессий БД
async def get_db():
# Убедитесь, что в models у вас настроен асинхронный SessionLocal
async with models.AsyncSessionLocal() as db:
try:
yield db
finally:
await db.close()
wsRouter = APIRouter(
prefix='/ws'
)
@wsRouter.websocket("/qr")
async def qr_websocket_endpoint(websocket: WebSocket, room_id: str = Query(...)):
print(f"QR сопряжение: новое соединение для комнаты {room_id}")
await manager.connect_qr(websocket, room_id)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
if message_data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
pass
finally:
manager.disconnect_qr(room_id)
print(f"QR сопряжение: отключено соединение для комнаты {room_id}")
@wsRouter.websocket("")
async def websocket_endpoint(websocket: WebSocket, token: str = Query(None), db: AsyncSession = Depends(get_db)):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
try:
user_id, session_id = await test_token(token=token, db=db)
user_id = int(user_id)
except HTTPException:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
print("ПОДКЛЮЧЕНИЕ")
await manager.connect(websocket, user_id, session_id)
print("ПОДКЛЮЧЕНО")
# АсинOverlay обновление статуса онлайн
await db.execute(
update(models.User)
.where(models.User.id == user_id)
.values(last_online=datetime.now(timezone.utc))
)
await db.commit()
await manager.broadcast({
"type": "user_online",
"user_id": user_id,
})
try:
while True:
print("ОЖИДАНИЕ СООБЩЕНИЙ")
data = await websocket.receive_text()
message_data = json.loads(data)
print(f"DEBUG: Получены данные: {message_data}")
# Обновляем last_online при любой активности
await db.execute(
update(models.User)
.where(models.User.id == user_id)
.values(last_online=datetime.now(timezone.utc))
)
await db.commit()
if message_data.get("type") == "set_online":
# Передаем session_id в метод менеджера
status_changed = await manager.set_online(user_id, session_id)
# Отправляем бродкаст ТОЛЬКО если пользователь до этого был полностью оффлайн
if status_changed:
await manager.broadcast({
"type": "user_online",
"user_id": user_id,
})
continue
if message_data.get("type") == "set_offline":
# Передаем session_id в метод менеджера
status_changed = await manager.set_offline(user_id, session_id)
# Отправляем бродкаст ТОЛЬКО если закрылась ПОСЛЕДНЯЯ активная сессия
if status_changed:
await manager.broadcast({
"type": "user_offline",
"user_id": user_id,
})
continue
if message_data.get("type") == "private_message":
# Получаем отправителя асинхронно
user_res = await db.execute(select(models.User).where(models.User.id == user_id))
user = user_res.scalars().first()
receiver_id = message_data.get("receiver_id")
temp_id = message_data.get("temp_id")
content = message_data.get("content")
content50 = message_data.get("content50")
message_type = message_data.get("message_type") or "text"
file_id = message_data.get("file_id")
encrypted_key = message_data.get("encrypted_key")
print(
f"DEBUG private_message payload: temp_id={temp_id}, receiver_id={receiver_id}")
if receiver_id is None or content is None:
await websocket.send_json({
"type": "error",
"detail": "receiver_id/content required",
})
continue
try:
receiver_id = int(receiver_id)
except (TypeError, ValueError):
await websocket.send_json({
"type": "error",
"detail": "receiver_id must be int",
})
continue
new_msg = models.Message(
sender_id=user_id,
receiver_id=receiver_id,
content=content,
message_type=message_type,
file_id=file_id,
encrypted_key=encrypted_key,
reply_to_id=message_data.get("reply_to_id"),
reply_to_text=message_data.get("reply_to_text")
)
db.add(new_msg)
await db.commit()
await db.refresh(new_msg)
print(f"DEBUG saved message: id={new_msg.id}")
user = await db.merge(user)
session_res = await db.execute(select(models.Session).where(models.Session.user_id == receiver_id))
sessions = session_res.scalars().all()
unread_res = await db.execute(
select(func.count(models.Message.id))
.where(models.Message.receiver_id == receiver_id, models.Message.read_at == None)
)
unread_count = unread_res.scalar() or 0
# ACK отправителю
print("Отправляем ACK отправителю")
await manager.send_personal_message({
"type": "message_sent",
"temp_id": temp_id,
"server_id": new_msg.id,
"sender_id": user_id,
"receiver_id": receiver_id,
"content": content,
"message_type": message_type,
"file_id": file_id,
"encrypted_key": encrypted_key,
"reply_to_id": new_msg.reply_to_id,
"reply_to_text": new_msg.reply_to_text,
"unread_count": unread_count,
"timestamp": (new_msg.timestamp or datetime.utcnow()).isoformat(),
}, str(user_id))
# Формируем пакет для получателя
outgoing_message = {
"id": new_msg.id,
"type": "private_message",
"sender_id": user_id,
"receiver_id": receiver_id,
"content": content,
"message_type": message_type,
"file_id": file_id,
"encrypted_key": encrypted_key,
"timestamp": (new_msg.timestamp or datetime.utcnow()).isoformat(),
"reply_to_id": new_msg.reply_to_id,
"reply_to_text": new_msg.reply_to_text,
"unread_count": unread_count,
}
_sent_to_receiver = False
for session in sessions:
if manager.active_connections.get(str(receiver_id), {}).get(session.id):
print(
f"DEBUG: Пользователь {receiver_id} онлайн в сессии {session.id}, отправляем через WebSocket")
sent_to_receiver = await manager.send_personal_message_to_session(outgoing_message, str(receiver_id), session.id)
if sent_to_receiver:
_sent_to_receiver = True
print(
f"DEBUG: Сообщение доставлено по сокету пользователю {receiver_id} в сессии {session.id}")
else:
if session.fcm_token:
_sent_to_receiver = True
send_fcm_notification(
token=session.fcm_token,
user_id=user.id,
username=user.username,
firstname=user.first_name,
lastname=user.last_name,
public_key=user.public_key,
encrypted_text=content50,
timestamp=new_msg.timestamp or datetime.utcnow(),
unread_count=str(unread_count),
message_id=str(new_msg.id)
)
print(
f"DEBUG: Сообщение не доставлено по сокету, отправлено FCM уведомление пользователю {receiver_id} сессия {session.id} токен {session.fcm_token}")
# Если успешно доставлено по сокету — отмечаем в БД
if _sent_to_receiver:
try:
delivered_at = datetime.utcnow()
new_msg.delivered_at = delivered_at
await db.commit()
await manager.send_personal_message({
"type": "message_delivered",
"message_id": new_msg.id,
"timestamp": delivered_at.isoformat(),
}, str(user_id))
except Exception as e:
print(f"Ошибка при сохранении статуса доставки: {e}")
await db.rollback()
elif message_data.get("type") == "edit_message":
message_id = message_data.get("message_id")
content = message_data.get("content")
if message_id is None or content is None:
await websocket.send_json({"type": "error", "detail": "message_id/content required"})
continue
try:
message_id = int(message_id)
except (TypeError, ValueError):
await websocket.send_json({"type": "error", "detail": "message_id must be int"})
continue
msg_res = await db.execute(select(models.Message).where(models.Message.id == message_id))
msg = msg_res.scalars().first()
if msg is None or msg.sender_id != user_id:
continue
try:
msg.content = content
msg.edited_at = datetime.utcnow()
await db.commit()
except Exception as e:
print(f"Ошибка редактирования: {e}")
await db.rollback()
continue
event = {
"type": "message_edited",
"message_id": msg.id,
"sender_id": msg.sender_id,
"content": msg.content,
"edited_at": msg.edited_at.isoformat() if msg.edited_at else None,
}
await manager.send_personal_message(event, str(msg.receiver_id))
await manager.send_personal_message(event, str(msg.sender_id))
elif message_data.get("type") == "delete_message":
message_id = message_data.get("message_id")
if message_id is None:
await websocket.send_json({"type": "error", "detail": "message_id required"})
continue
try:
message_id = int(message_id)
except (TypeError, ValueError):
await websocket.send_json({"type": "error", "detail": "message_id must be int"})
continue
msg_res = await db.execute(select(models.Message).where(models.Message.id == message_id))
msg = msg_res.scalars().first()
if msg is None or msg.sender_id != user_id:
continue
receiver_id = msg.receiver_id
try:
await db.delete(msg)
await db.commit()
except Exception as e:
print(f"Ошибка удаления: {e}")
await db.rollback()
continue
event = {
"type": "message_deleted",
"message_id": message_id,
}
await manager.send_personal_message(event, str(receiver_id))
await manager.send_personal_message(event, str(user_id))
elif message_data.get("type") == "read_receipt":
message_id = message_data.get("message_id")
try:
message_id = int(message_id)
except (TypeError, ValueError):
continue
msg_res = await db.execute(select(models.Message).where(models.Message.id == message_id))
msg = msg_res.scalars().first()
if msg is None:
continue
if int(msg.receiver_id) != int(user_id):
continue
try:
read_at = datetime.utcnow()
msg.read_at = read_at
await db.commit()
except Exception as e:
print(f"Ошибка прочтения: {e}")
await db.rollback()
continue
unread_res = await db.execute(
select(func.count(models.Message.id))
.where(models.Message.receiver_id == user_id, models.Message.read_at == None)
)
unread_count = unread_res.scalar() or 0
sender_id = int(msg.sender_id)
await manager.send_personal_message({
"type": "message_read",
"sender_id": sender_id,
"receiver_id": user_id,
"message_id": message_id,
"unread_count": unread_count,
"timestamp": read_at.isoformat(),
}, str(sender_id))
await manager.send_personal_message({
"type": "message_read",
"sender_id": sender_id,
"receiver_id": user_id,
"message_id": message_id,
"unread_count": unread_count,
"timestamp": read_at.isoformat(),
}, str(user_id))
elif message_data.get("type") == "read_all_chat":
contact_id = message_data.get("contact_id")
try:
contact_id = int(contact_id)
except (TypeError, ValueError):
continue
try:
read_at_time = datetime.utcnow()
await db.execute(
update(models.Message)
.where(
(models.Message.receiver_id == user_id) &
(models.Message.sender_id == contact_id) &
(models.Message.read_at.is_(None))
)
.values(read_at=read_at_time)
)
await db.commit()
await manager.send_personal_message({
"type": "all_chat_read",
"sender_id": sender_id,
"reader_id": user_id,
"timestamp": read_at_time.isoformat(),
}, str(contact_id))
await manager.send_personal_message({
"type": "all_chat_read",
"sender_id": sender_id,
"reader_id": user_id,
"timestamp": read_at_time.isoformat(),
}, str(user_id))
except Exception as e:
print(f"Ошибка массового прочтения: {e}")
await db.rollback()
continue
elif message_data.get("type") == "typing":
receiver_id = message_data.get("receiver_id")
if receiver_id is None:
continue
try:
receiver_id = int(receiver_id)
except (TypeError, ValueError):
await websocket.send_json({
"type": "error",
"detail": "receiver_id must be int",
})
continue
await manager.send_personal_message({"type": "typing", "sender_id": user_id}, str(receiver_id))
elif message_data.get("type") == "stop_typing":
receiver_id = message_data.get("receiver_id")
if receiver_id is None:
continue
try:
receiver_id = int(receiver_id)
except (TypeError, ValueError):
await websocket.send_json({
"type": "error",
"detail": "receiver_id must be int",
})
continue
await manager.send_personal_message({"type": "stop_typing", "sender_id": user_id}, str(receiver_id))
if message_data.get("type") == "call_init":
receiver_id = message_data.get("receiver_id")
call_id = str(uuid.uuid4())
call_data = {
"type": "call_init",
"call_id": call_id,
"caller_username": message_data.get("caller_username"),
"caller_id": str(user_id)
}
sent = await manager.send_personal_message(call_data, str(receiver_id))
if sent:
await manager.send_personal_message({"type": "call_created", "call_id": call_id}, str(user_id))
elif message_data.get("type") in ["offer", "answer", "ice_candidate", "hangup", "decline"]:
receiver_id = message_data.get("receiver_id")
await manager.send_personal_message(message_data, str(receiver_id))
except WebSocketDisconnect:
pass
finally:
await manager.disconnect(user_id, session_id)
try:
# Асинхронно ставим статус оффлайн при выходе
await db.execute(
update(models.User)
.where(models.User.id == user_id)
.values(last_online=datetime.now(timezone.utc))
)
await db.commit()
except Exception as e:
print(f"Ошибка обновления статуса при отключении: {e}")
print("ОТКЛЮЧЕНИЕ")
def send_fcm_notification(token, user_id, username, firstname, lastname, public_key, encrypted_text, timestamp, unread_count='1', message_id='0'):
print(f"DEBUG: Отправляем FCM уведомление пользователю {user_id}")
message = messaging.Message(
data={
"type": "enc_message",
"sender_id": str(user_id),
"username": username,
"firstname": firstname,
"lastname": lastname,
"public_key": public_key,
"content": encrypted_text,
"timestamp": timestamp.isoformat(),
"unread_count": str(unread_count),
"message_id": str(message_id),
},
android=messaging.AndroidConfig(priority='high'),
token=token,
)
try:
response = messaging.send(message)
print('Successfully sent message:', response)
except Exception as e:
print('Unexpected error sending push:', e)
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, Dict[int, WebSocket]] = {}
self.online_users: Dict[str, datetime] = {}
self.qr_connections: Dict[str, WebSocket] = {}
async def connect_qr(self, websocket: WebSocket, room_id: str):
await websocket.accept()
self.qr_connections[room_id] = websocket
def disconnect_qr(self, room_id: str):
self.qr_connections.pop(room_id, None)
async def send_qr_message(self, room_id: str, message: dict) -> bool:
if room_id in self.qr_connections:
try:
await self.qr_connections[room_id].send_json(message)
return True
except Exception:
self.disconnect_qr(room_id)
return False
async def connect(self, websocket: WebSocket, user_id: int, session_id: int):
await websocket.accept()
user_key = str(user_id)
if user_key not in self.active_connections:
self.active_connections[user_key] = {}
self.active_connections[user_key][session_id] = websocket
# Логика онлайна: проверяем, первый ли это вход пользователя в сеть
is_first_session = user_key not in self.online_users
if is_first_session:
self.online_users[user_key] = {}
self.online_users[user_key][session_id] = datetime.now(timezone.utc)
# Если статус изменился на онлайн, сразу делаем рассылку
if is_first_session:
await self.broadcast({
"type": "user_online",
"user_id": user_id,
})
async def disconnect(self, user_id: int, session_id: int):
user_key = str(user_id)
# 1. Удаляем соединение из active_connections
if user_key in self.active_connections:
self.active_connections[user_key].pop(session_id, None)
if not self.active_connections[user_key]:
self.active_connections.pop(user_key, None)
# 2. Удаляем сессию из online_users
if user_key in self.online_users:
self.online_users[user_key].pop(session_id, None)
# Если это была последняя активная сессия пользователя
if not self.online_users[user_key]:
self.online_users.pop(user_key, None)
# Отправляем бродкаст: пользователь полностью ушел в офлайн
await self.broadcast({
"type": "user_offline",
"user_id": user_id,
})
async def send_personal_message(self, message: dict, user_id: str) -> bool:
user_key = str(user_id)
if user_key in self.active_connections:
success = False
for session_id, websocket in list(self.active_connections[user_key].items()):
try:
print(
f"DEBUG: Отправляем сообщение пользователю {user_id} в сессии {session_id} через WebSocket")
await websocket.send_json(message)
success = True
except Exception:
await self.disconnect(int(user_id), session_id)
return success
return False
async def send_personal_message_to_session(self, message: dict, user_id: str, session_id: int) -> bool:
user_key = str(user_id)
if user_key in self.active_connections:
if session_id in self.active_connections[user_key]:
try:
await self.active_connections[user_key][session_id].send_json(message)
return True
except Exception:
await self.disconnect(int(user_id), session_id)
return False
async def kill_session_socket(self, user_id: int, session_id: int):
user_key = str(user_id)
if user_key in self.active_connections:
if session_id in self.active_connections[user_key]:
ws = self.active_connections[user_key][session_id]
try:
await ws.send_json({"type": "session_terminated"})
await asyncio.sleep(0.1)
await ws.close(code=status.WS_1008_POLICY_VIOLATION)
except Exception as e:
print(f"Ошибка при закрытии сокета: {e}")
try:
del self.active_connections[user_key][session_id]
except Exception as e:
print(f"Ошибка при удалении сокета: {e}")
async def broadcast(self, message: dict):
for connection in self.active_connections.values():
for websocket in connection.values():
try:
await websocket.send_json(message)
except Exception:
pass
async def set_online(self, user_id: str, session_id: int) -> bool:
u_id = str(user_id)
if u_id not in self.active_connections:
return False
# Если пользователя вообще не было в онлайн-списке, значит статус МЕНЯЕТСЯ
is_first_session = u_id not in self.online_users
if is_first_session:
self.online_users[u_id] = {}
self.online_users[u_id][session_id] = datetime.now(timezone.utc)
return is_first_session # True, если юзер стал онлайн глобально
async def set_offline(self, user_id: str, session_id: int) -> bool:
u_id = str(user_id)
if u_id not in self.online_users:
return False
# Удаляем текущую сессию
self.online_users[u_id].pop(session_id, None)
# Если после удаления этой сессии у юзера больше НЕТ других сессий
if not self.online_users[u_id]:
del self.online_users[u_id]
return True # Статус ИЗМЕНИЛСЯ, пользователь полностью ушел в оффлайн
return False # Сессия закрыта, но юзер всё еще онлайн с другого устройства
manager = ConnectionManager()