Chepuhagram/srv/app/api/endpoints/admin.py

221 lines
8.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
from app.db import models
from app.api import schemas
from app.core.security import get_current_user, get_password_hash
from sqlalchemy.exc import IntegrityError
from pydantic import BaseModel
from datetime import datetime, timezone
from app.services.notification_service import send_system_notification
from app.api.endpoints.users import _delete_old_avatar_file
from app.websocket import connection_manager
adminRouter = APIRouter(
prefix="/admin", tags=["admin"], include_in_schema=False)
async def get_db():
async with models.AsyncSessionLocal() as db:
try:
yield db
finally:
await db.close()
def require_admin(current_user: models.User = Depends(get_current_user)):
if current_user.id != 1:
raise HTTPException(
status_code=403, detail="Доступ запрещен. Требуются права супер-администратора.")
return current_user
@adminRouter.get("/users", response_model=List[schemas.AdminUserListItem])
async def get_all_users(db: AsyncSession = Depends(get_db), admin: models.User = Depends(require_admin)):
result = await db.execute(select(models.User))
return result.scalars().all()
@adminRouter.post("/users")
async def admin_create_user(user_data: schemas.AdminCreateUser, db: AsyncSession = Depends(get_db), admin: models.User = Depends(require_admin)):
result = await db.execute(select(models.User).where(models.User.username == user_data.username))
if result.scalars().first():
raise HTTPException(
status_code=400, detail="Пользователь с таким именем уже существует")
if user_data.id is not None:
result_id = await db.execute(select(models.User).where(models.User.id == user_data.id))
if result_id.scalars().first():
raise HTTPException(
status_code=400, detail="Пользователь с таким ID уже существует")
hashed_pw = get_password_hash(user_data.password)
new_user = models.User(
username=user_data.username,
hashed_password=hashed_pw,
first_name=user_data.first_name,
last_name=user_data.last_name,
is_blocked=0
)
if user_data.id is not None:
new_user.id = user_data.id
db.add(new_user)
await db.commit()
await db.refresh(new_user)
return {"status": "ok", "user_id": new_user.id}
@adminRouter.post("/users/{user_id}/block")
async def block_user(user_id: int, db: AsyncSession = Depends(get_db), admin: models.User = Depends(require_admin)):
if user_id == 1:
raise HTTPException(
status_code=400, detail="Нельзя заблокировать главного администратора")
result = await db.execute(select(models.User).where(models.User.id == user_id))
user = result.scalars().first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
user.is_blocked = 1
await db.commit()
return {"status": "ok", "message": f"Пользователь {user_id} заблокирован"}
@adminRouter.post("/users/{user_id}/unblock")
async def unblock_user(user_id: int, db: AsyncSession = Depends(get_db), admin: models.User = Depends(require_admin)):
result = await db.execute(select(models.User).where(models.User.id == user_id))
user = result.scalars().first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
user.is_blocked = 0
await db.commit()
return {"status": "ok", "message": f"Пользователь {user_id} разблокирован"}
@adminRouter.put("/users/{user_id}/profile")
async def admin_update_user_profile(
user_id: int,
profile_data: schemas.UpdateMe,
db: AsyncSession = Depends(get_db),
admin: models.User = Depends(require_admin)
):
result = await db.execute(select(models.User).where(models.User.id == user_id))
user = result.scalars().first()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
if profile_data.username and profile_data.username.strip() != '':
dup_res = await db.execute(
select(models.User).where(models.User.username ==
profile_data.username, models.User.id != user_id)
)
if dup_res.scalars().first():
raise HTTPException(
status_code=400, detail="Имя пользователя уже занято")
update_dict = profile_data.model_dump(exclude_unset=True)
for key, value in update_dict.items():
setattr(user, key, value)
# 4. Сохранение изменений
try:
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(
status_code=400,
detail="Ошибка: такой телефон или email уже используется другим аккаунтом"
)
return {"status": "ok", "message": "Профиль успешно изменен администратором"}
@adminRouter.put("/users/{user_id}/avatar")
async def update_user_avatar(
user_id: int,
data: dict,
current_user: models.User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
admin: models.User = Depends(require_admin)
):
user_to_update = await db.execute(select(models.User).where(models.User.id == user_id))
user_to_update = user_to_update.scalars().first()
if not user_to_update:
raise HTTPException(status_code=404, detail="Пользователь не найден")
avatar_file_id = data.get("avatar_file_id")
if avatar_file_id:
old_avatar_file_id = user_to_update.avatar_file_id
if old_avatar_file_id and old_avatar_file_id != avatar_file_id:
await _delete_old_avatar_file(old_avatar_file_id, db)
user_to_update.avatar_file_id = avatar_file_id
await db.commit()
print(
f"Пользователь {user_to_update.id} обновил аватар: {avatar_file_id}")
else:
raise HTTPException(
status_code=400, detail="avatar_file_id is required")
await connection_manager.manager.broadcast({'type': 'user_updated', 'user_id': user_id})
return {"message": "Avatar updated"}
class BroadcastRequest(BaseModel):
content: str
user_ids: Optional[List[int]] = None # None = всем пользователям
@adminRouter.post("/broadcast")
async def send_broadcast(
data: BroadcastRequest,
current_user: models.User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
admin: models.User = Depends(require_admin)
):
"""
Отправляет рассылку от системного аккаунта (юзер 0).
Доступно только администратору (юзер 1).
"""
if current_user.id != 1:
raise HTTPException(
status_code=403, detail="Только администратор может отправлять рассылки")
# Определяем получателей
if data.user_ids:
# Отправляем определенным пользователям
receiver_res = await db.execute(
select(models.User).where(
(models.User.id.in_(data.user_ids)) & (models.User.id != 0)
)
)
recipients = receiver_res.scalars().all()
else:
# Отправляем всем (кроме системного аккаунта)
receiver_res = await db.execute(
select(models.User).where(models.User.id != 0)
)
recipients = receiver_res.scalars().all()
# Создаем сообщения для каждого получателя
created_messages = []
for recipient in recipients:
await send_system_notification(
db=db,
receiver_id=recipient.id,
plain_text=data.content
)
return {
"status": "ok",
"message": f"Рассылка отправлена {len(recipients)} пользователям",
"count": len(recipients),
}