From a5d3a72d0c41389b611774ec8ec1c84f5317f8c5 Mon Sep 17 00:00:00 2001 From: Ilya Bikmeev Date: Sat, 6 Dec 2025 18:25:08 +0500 Subject: [PATCH] feature: add scheduler --- services/ingest-service/app/config.py | 18 +++++ .../ingest-service/app/infrastructure/s3.py | 67 +++++++++++++++++++ .../app/infrastructure/scheduler.py | 60 +++++++++++++++++ services/ingest-service/app/main.py | 20 +++++- .../ingest-service/app/models/call_event.py | 8 ++- services/ingest-service/docker-compose.yaml | 16 +++-- services/ingest-service/requirements.txt | 4 +- 7 files changed, 182 insertions(+), 11 deletions(-) create mode 100644 services/ingest-service/app/config.py create mode 100644 services/ingest-service/app/infrastructure/s3.py create mode 100644 services/ingest-service/app/infrastructure/scheduler.py diff --git a/services/ingest-service/app/config.py b/services/ingest-service/app/config.py new file mode 100644 index 0000000..8280e25 --- /dev/null +++ b/services/ingest-service/app/config.py @@ -0,0 +1,18 @@ +import os +from pydantic_settings import BaseSettings + +class Settings(BaseSettings): + DATABASE_URL: str + S3_ENDPOINT_URL: str + AWS_ACCESS_KEY_ID: str + AWS_SECRET_ACCESS_KEY: str + AWS_REGION: str + S3_BUCKET_NAME: str + MAX_RETRIES: int + RETRY_DELAY: int + + # class Config: + # env_file = ".env" + # env_file_encoding = "utf-8" + +settings = Settings() \ No newline at end of file diff --git a/services/ingest-service/app/infrastructure/s3.py b/services/ingest-service/app/infrastructure/s3.py new file mode 100644 index 0000000..59234ef --- /dev/null +++ b/services/ingest-service/app/infrastructure/s3.py @@ -0,0 +1,67 @@ +import aioboto3 +import asyncio +import logging +import os + +import aiohttp +from app.config import settings + +logger = logging.getLogger(__name__) + +async def download_and_upload_stream(url: str, key_name: str) -> bool: + """ + Скачивает аудиофайл по URL потоками и передаёт в upload_file_to_s3 для загрузки. + """ + try: + logger.info(f"[INFO] Начало скачивания {url}") + async with aiohttp.ClientSession() as http_session: + async with http_session.get(url) as resp: + resp.raise_for_status() + + # читаем файл по частям и собираем в bytes + chunks = [] + while True: + chunk = await resp.content.read(1024 * 1024) # 1 МБ + if not chunk: + break + chunks.append(chunk) + file_bytes = b"".join(chunks) + + logger.info(f"[INFO] Скачивание {url} завершено, размер {len(file_bytes)} байт") + + # передаём в функцию загрузки с retry + success = await upload_file_to_s3(file_bytes, key_name) + return success + + except Exception as e: + logger.error(f"[FAILED] Не удалось скачать или загрузить {url}: {e}") + return False + +async def upload_file_to_s3(file_bytes: bytes, key_name: str) -> bool: + """ + Асинхронно загружает файл в S3 с ретраями. + Возвращает True при успехе, False при ошибке. + """ + session = aioboto3.Session() + for attempt in range(1, settings.MAX_RETRIES + 1): + try: + logger.info(f"Попытка {attempt} загрузки {key_name} в S3") + #logger.info(f"Параметры S3: {S3_ENDPOINT_URL}, {S3_BUCKET_NAME}, {AWS_REGION}") + + async with session.client( + "s3", + endpoint_url=settings.S3_ENDPOINT_URL, + aws_access_key_id=settings.AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, + region_name=settings.AWS_REGION + ) as s3: + await s3.put_object(Bucket=settings.S3_BUCKET_NAME, Key=key_name, Body=file_bytes) + logger.info(f"[OK] Файл {key_name} загружен в S3!") + return True + except Exception as e: + logger.exception(f"[ERROR] Попытка {attempt} для {key_name} не удалась: {e}") + if attempt < settings.MAX_RETRIES: + await asyncio.sleep(settings.RETRY_DELAY) + else: + logger.error(f"[FAILED] Файл {key_name} не удалось загрузить после {settings.MAX_RETRIES} попыток") + return False \ No newline at end of file diff --git a/services/ingest-service/app/infrastructure/scheduler.py b/services/ingest-service/app/infrastructure/scheduler.py new file mode 100644 index 0000000..9b9739d --- /dev/null +++ b/services/ingest-service/app/infrastructure/scheduler.py @@ -0,0 +1,60 @@ +import asyncio +import logging +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from app.models.call_event import CallEvent +from app.infrastructure.s3 import download_and_upload_stream +from app.infrastructure.database import get_db +from app.config import settings + +logger = logging.getLogger(__name__) + +BATCH_SIZE = 20 +MAX_RETRIES_CALL = 3 +async def fetch_pending_calls(session: AsyncSession): + stmt = ( + select(CallEvent) + .where(CallEvent.processing_status == "pending") + .where(CallEvent.retries < MAX_RETRIES_CALL) + .order_by(CallEvent.id) + .limit(BATCH_SIZE) + .with_for_update(skip_locked=True) + ) + result = await session.execute(stmt) + calls = result.scalars().all() + return calls + +async def process_pending_calls(): + logger.info("[INFO] Запуск обработки новых записей") + + async for session in get_db(): + calls = await fetch_pending_calls(session) + if not calls: + logger.info("[INFO] Нет новых записей для обработки") + return + + # сразу увеличиваем retries и помечаем processing + for call in calls: + call.retries += 1 + call.processing_status = "processing" + await session.commit() + + # создаём задачи на скачивание/загрузку + tasks = [] + for call in calls: + key_name = f"audio/{call.id}.mp3" + logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link) + tasks.append(download_and_upload_stream(call.full_record_file_link, key_name)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + # обновляем статус после обработки + for call, result in zip(calls, results): + if isinstance(result, Exception) or result is False: + call.processing_status = "failed" + else: + call.processing_status = "done" + call.s3_key = f"audio/{call.id}.mp3" + + await session.commit() + logger.info(f"Обработка пачки из {len(calls)} записей завершена") \ No newline at end of file diff --git a/services/ingest-service/app/main.py b/services/ingest-service/app/main.py index 29592f9..d2310df 100644 --- a/services/ingest-service/app/main.py +++ b/services/ingest-service/app/main.py @@ -1,10 +1,16 @@ -from contextlib import asynccontextmanager from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse +from fastapi.concurrency import asynccontextmanager import logging +from app.infrastructure.scheduler import process_pending_calls from app.api.uis import router as uis_router from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from fastapi import FastAPI +import asyncio +from app.infrastructure.database import get_db # Настройка логирования logging.basicConfig( @@ -13,13 +19,23 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +@asynccontextmanager +async def lifespan(app:FastAPI): + scheduler = AsyncIOScheduler() + + scheduler.add_job(process_pending_calls, "interval", seconds = 10) + + scheduler.start() + yield + await scheduler.shutdown(wait=True) + app = FastAPI( title="Ingest Service API", description="Микросервис для приема событий звонков", version="1.0.0", + lifespan=lifespan ) - # Exception handlers @app.exception_handler(CallEventAlreadyExistsError) async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError): diff --git a/services/ingest-service/app/models/call_event.py b/services/ingest-service/app/models/call_event.py index be2c0f3..cf46b5f 100644 --- a/services/ingest-service/app/models/call_event.py +++ b/services/ingest-service/app/models/call_event.py @@ -1,7 +1,9 @@ -from sqlalchemy import Column, BigInteger, Integer, String, DateTime +from sqlalchemy import Column, BigInteger, Integer, String, DateTime, Enum as SQLEnum +from enum import Enum from sqlalchemy.dialects.postgresql import UUID as PG_UUID import uuid from app.infrastructure.database import Base +from enum import Enum as PyEnum class CallEvent(Base): __tablename__ = "call_events" @@ -20,3 +22,7 @@ class CallEvent(Base): clean_talk_time_duration = Column(Integer, nullable=False) full_record_file_link = Column(String, nullable=False) tcm_topcrm_notification_name = Column(String, nullable=False) + + s3_key = Column(String, nullable=True) + processing_status = Column(String, default="pending", nullable=False) + retries = Column(Integer, default=0, nullable=False) \ No newline at end of file diff --git a/services/ingest-service/docker-compose.yaml b/services/ingest-service/docker-compose.yaml index 3432814..a4bf20b 100644 --- a/services/ingest-service/docker-compose.yaml +++ b/services/ingest-service/docker-compose.yaml @@ -6,12 +6,8 @@ services: context: . dockerfile: Dockerfile container_name: ingest-service - environment: - # Подключение к PostgreSQL из infra/docker-compose.yaml - DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db - APP_HOST: 0.0.0.0 - APP_PORT: 8000 - DEBUG: "False" + env_file: + - .env ports: - "8000:8000" networks: @@ -21,7 +17,13 @@ services: command: > sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000" healthcheck: - test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"] + test: + [ + "CMD", + "python", + "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()", + ] interval: 30s timeout: 3s start_period: 10s diff --git a/services/ingest-service/requirements.txt b/services/ingest-service/requirements.txt index 8c573e7..47d4c7b 100644 --- a/services/ingest-service/requirements.txt +++ b/services/ingest-service/requirements.txt @@ -5,4 +5,6 @@ sqlalchemy==2.0.23 asyncpg==0.29.0 alembic==1.13.1 python-dotenv==1.0.0 - +aioboto3==15.5.0 +apscheduler==3.11.1 +pydantic_settings==2.11.0 \ No newline at end of file