import asyncio import logging from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker from app.models.call_event import CallEvent from app.infrastructure.s3 import download_and_upload_stream from app.infrastructure.database import get_db from app.config import settings logger = logging.getLogger(__name__) BATCH_SIZE = 20 MAX_RETRIES_CALL = 3 async def fetch_pending_calls(session: AsyncSession): stmt = ( select(CallEvent) .where(CallEvent.processing_status == "pending") .where(CallEvent.retries < MAX_RETRIES_CALL) .order_by(CallEvent.id) .limit(BATCH_SIZE) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) calls = result.scalars().all() return calls async def process_pending_calls(): logger.info("[INFO] Запуск обработки новых записей") async for session in get_db(): calls = await fetch_pending_calls(session) if not calls: logger.info("[INFO] Нет новых записей для обработки") return # сразу увеличиваем retries и помечаем processing for call in calls: call.retries += 1 call.processing_status = "processing" await session.commit() # создаём задачи на скачивание/загрузку tasks = [] for call in calls: key_name = f"audio/{call.id}.mp3" logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link) tasks.append(download_and_upload_stream(call.full_record_file_link, key_name)) results = await asyncio.gather(*tasks, return_exceptions=True) # обновляем статус после обработки for call, result in zip(calls, results): if isinstance(result, Exception) or result is False: call.processing_status = "failed" else: call.processing_status = "done" call.s3_key = f"audio/{call.id}.mp3" await session.commit() logger.info(f"Обработка пачки из {len(calls)} записей завершена")