import io import os import uuid import urllib.parse from fastapi import Depends, HTTPException, status, APIRouter, File, UploadFile, Form from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session from sqlalchemy.sql import func from google.oauth2.credentials import Credentials import asyncio from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseUpload, MediaIoBaseDownload from app.core.security import get_current_user from app.db import models from app.core.config import config mediaRouter = APIRouter( prefix='/media', tags=['media'], ) # --------------------------------------------------------------------------- # Инициализация клиента Google Drive # --------------------------------------------------------------------------- def _get_drive_service(): try: credentials = Credentials( token=None, refresh_token=config.GOOGLE_REFRESH_TOKEN, token_uri='https://oauth2.googleapis.com/token', client_id=config.GOOGLE_CLIENT_ID, client_secret=config.GOOGLE_CLIENT_SECRET, scopes=['https://www.googleapis.com/auth/drive'] ) return build('drive', 'v3', credentials=credentials) except Exception as e: raise HTTPException( status_code=500, detail=f"Failed to initialize Google Drive service: {str(e)}" ) # --------------------------------------------------------------------------- # Контроль квоты пользователя (с удалением старых файлов) # --------------------------------------------------------------------------- def _cleanup_google_drive_quota(db: Session, owner_id: int | None, new_file_size: int): """ Проверяет квоту пользователя (из .env). Если лимит превышен, автоматически удаляет самые старые файлы из Google Drive и таблицы media_items, пока не освободится достаточно места для нового файла. """ if owner_id is None: return # Получаем лимит квоты из конфигурации приложения (.env) user_quota = getattr(config, "HOME_USER_QUOTA_BYTES", 10737418240) # 10 ГБ по умолчанию user = db.query(models.User).filter(models.User.id == owner_id).first() active_avatar_id = user.avatar_file_id if user else None sum_query = db.query(func.sum(models.MediaItem.size_bytes)).filter( models.MediaItem.owner_id == owner_id ) if active_avatar_id: sum_query = sum_query.filter(models.MediaItem.file_id != active_avatar_id) total_used = sum_query.scalar() or 0 total_used = int(total_used) # Если вместе с новым файлом мы укладываемся в квоту — чистка не требуется if total_used + new_file_size <= user_quota: return # 2. Если места не хватает, выбираем файлы пользователя от старых к новым files = db.query(models.MediaItem).filter( models.MediaItem.owner_id == owner_id ).order_by(models.MediaItem.created_at.asc()).all() service = _get_drive_service() for file_record in files: # Удаляем старые файлы до тех пор, пока новый файл не поместится в квоту if total_used + new_file_size <= user_quota: break # Физическое удаление файла из Google Drive try: drive_id = file_record.storage_file_id service.files().delete(fileId=drive_id).execute() except Exception: # Если файл уже удален из Google Drive вручную, игнорируем ошибку pass # Корректируем счетчик занятого места и удаляем запись из БД total_used -= file_record.size_bytes db.delete(file_record) db.commit() # --------------------------------------------------------------------------- # Эндпоинты: Загрузка (Upload) # --------------------------------------------------------------------------- class SeekableFastAPIStream(io.RawIOBase): """ Обертка над синхронным внутренним файлом FastAPI. Удовлетворяет требованиям Google SDK по наличию методов seek/tell. """ def __init__(self, raw_file): self.raw_file = raw_file self._position = 0 def readinto(self, b): # file.file.read в FastAPI работает СИНХРОННО chunk = self.raw_file.read(len(b)) if not chunk: return 0 n = len(chunk) b[:n] = chunk self._position += n return n def seek(self, offset, whence=io.SEEK_SET): if whence == io.SEEK_SET and offset == 0: try: self.raw_file.seek(0) except Exception: pass self._position = 0 return 0 elif whence == io.SEEK_CUR: return self._position elif whence == io.SEEK_END: return self._position return self._position def tell(self): return self._position def seekable(self): return True def readable(self): return True @mediaRouter.post('/upload') @mediaRouter.post('/v2/upload') async def upload_file( file: UploadFile = File(...), current_user: models.User = Depends(get_current_user), ): """ Загружает файл на Google Drive в режиме стриминга без блокировки RAM. """ if not file.filename: raise HTTPException(status_code=400, detail='No selected file') max_upload_size = getattr(config, "MEDIA_UPLOAD_MAX_BYTES", 52428800) file_size = file.size if file_size and file_size > max_upload_size: raise HTTPException( status_code=400, detail=f'File too large (max {max_upload_size} bytes)' ) db = models.SessionLocal() try: if file_size: _cleanup_google_drive_quota(db, current_user.id, file_size) service = _get_drive_service() file_id = uuid.uuid4().hex file_metadata = { 'name': f"{file_id}.enc", 'parents': [config.GOOGLE_DRIVE_FOLDER_ID] } # Передаем file.file (синхронный поток FastAPI) stream = SeekableFastAPIStream(file.file) # 1. Задаем размер чанка (строго кратен 256 КБ). # Если файлы тяжелые, можно увеличить до 5 * 1024 * 1024 (5 МБ) для стабильности SSL chunk_size = 5 * 1024 * 1024 # 1 МБ # 2. Инициализируем медиа-загрузчик # Если file_size известен (из заголовков), обязательно передаем его Google. # Если не передан (None), Google будет ждать закрывающий чанк, что часто вызывает сброс SSL. media = MediaIoBaseUpload( stream, mimetype=file.content_type or 'application/octet-stream', chunksize=chunk_size, resumable=True ) if file_size: media._size = file_size # 3. Переписываем логику выполнения загрузки. # Вместо одного слепого вызова .execute() мы инициализируем запрос # и пошагово отправляем чанки. Это предотвращает таймауты SSL-сессии. def _execute_resumable_upload(): request = service.files().create( body=file_metadata, media_body=media, fields='id,size', supportsAllDrives=True ) response = None retries = 0 max_retries = 3 while response is None: try: status, response = request.next_chunk() if status: print(f"Uploaded {int(status.progress() * 100)}%...") retries = 0 # Сбрасываем попытки при успешном чанке except Exception as e: retries += 1 print( f"Ошибка при загрузке чанка: {e}. Попытка {retries} из {max_retries}") if retries >= max_retries: raise e # Если интернет совсем пропал — падаем окончательно import time time.sleep(1) # Ждем секунду перед повторной попыткой return response # Запускаем пошаговый стриминг в пуле потоков drive_file = await asyncio.to_thread(_execute_resumable_upload) drive_id = drive_file.get('id') final_size = int(drive_file.get('size', 0) ) if file_size is None else file_size media_item = models.MediaItem( file_id=file_id, owner_id=current_user.id, original_filename=file.filename, content_type=file.content_type or 'application/octet-stream', storage_file_id=drive_id, size_bytes=final_size, ) db.add(media_item) db.commit() except Exception as e: print(f"Upload operation failed: {str(e)}") raise HTTPException( status_code=500, detail=f"Upload operation failed: {str(e)}" ) finally: db.close() await file.close() return {'status': 'ok', 'file_id': file_id} # --------------------------------------------------------------------------- # Эндпоинты: Получение метаданных и скачивание (Size / Download) # --------------------------------------------------------------------------- @mediaRouter.get('/size/{file_id}') async def get_file_size(file_id: str): """ Возвращает информацию о размере и типе файла из таблицы media_items. """ db = models.SessionLocal() db_file = None try: db_file = db.query(models.MediaItem).filter( models.MediaItem.file_id == file_id ).first() finally: db.close() if not db_file: raise HTTPException(status_code=404, detail='File not found') encoded_filename = urllib.parse.quote(db_file.original_filename) return { "file_id": file_id, "size": db_file.size_bytes, "file_name": encoded_filename, "content_type": db_file.content_type } @mediaRouter.get('/{file_id}') async def get_file(file_id: str): db = models.SessionLocal() try: db_file = db.query(models.MediaItem).filter( models.MediaItem.file_id == file_id ).first() finally: db.close() if not db_file: raise HTTPException(status_code=404, detail='File not found') drive_id = db_file.storage_file_id try: service = _get_drive_service() request = service.files().get_media(fileId=drive_id) async def _async_stream_drive_file(): fh = io.BytesIO() downloader = MediaIoBaseDownload( fh, request, chunksize=1024 * 1024) done = False last_position = 0 while not done: # Оборачиваем синхронный сетевой запрос Google SDK в asyncio.to_thread status, done = await asyncio.to_thread(downloader.next_chunk) fh.seek(last_position) chunk = fh.read() if chunk: yield chunk last_position = fh.tell() encoded_filename = urllib.parse.quote(db_file.original_filename) headers = { "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}" } # StreamingResponse отлично работает с async генераторами return StreamingResponse( _async_stream_drive_file(), media_type=db_file.content_type, headers=headers ) except Exception as e: raise HTTPException( status_code=502, detail=f"Error fetching file from Google Drive: {str(e)}" ) # --------------------------------------------------------------------------- # Эндпоинты: Копирование файлов (Copy) # --------------------------------------------------------------------------- @mediaRouter.post('/copy') async def copy( file_id: str = Form(...), current_user: models.User = Depends(get_current_user), ): """ Копирует файл внутри Google Drive новому пользователю с валидацией его личной квоты. """ db = models.SessionLocal() old_record = None try: old_record = db.query(models.MediaItem).filter( models.MediaItem.file_id == file_id ).first() finally: db.close() if not old_record: raise HTTPException(status_code=404, detail='Source file not found') db = models.SessionLocal() try: # Проверяем квоту для получателя копии (и удаляем его старые файлы при необходимости) _cleanup_google_drive_quota(db, current_user.id, old_record.size_bytes) new_file_id = uuid.uuid4().hex service = _get_drive_service() def _execute_copy(): return service.files().copy( fileId=old_record.storage_file_id, body={'name': f"{new_file_id}.enc"}, fields='id' ).execute() drive_file = await asyncio.to_thread(_execute_copy) new_drive_id = drive_file.get('id') # Сохранение информации о скопированном файле new_record = models.MediaItem( file_id=new_file_id, owner_id=current_user.id, original_filename=old_record.original_filename, content_type=old_record.content_type, storage_file_id=new_drive_id, size_bytes=old_record.size_bytes, ) db.add(new_record) db.commit() except Exception as e: raise HTTPException( status_code=500, detail=f"Copy operation failed: {str(e)}") finally: db.close() return {"status": "ok", "new_file_id": new_file_id}