feature: add scheduler
This commit is contained in:
18
services/ingest-service/app/config.py
Normal file
18
services/ingest-service/app/config.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import os
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
class Settings(BaseSettings):
|
||||
DATABASE_URL: str
|
||||
S3_ENDPOINT_URL: str
|
||||
AWS_ACCESS_KEY_ID: str
|
||||
AWS_SECRET_ACCESS_KEY: str
|
||||
AWS_REGION: str
|
||||
S3_BUCKET_NAME: str
|
||||
MAX_RETRIES: int
|
||||
RETRY_DELAY: int
|
||||
|
||||
# class Config:
|
||||
# env_file = ".env"
|
||||
# env_file_encoding = "utf-8"
|
||||
|
||||
settings = Settings()
|
||||
67
services/ingest-service/app/infrastructure/s3.py
Normal file
67
services/ingest-service/app/infrastructure/s3.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import aioboto3
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def download_and_upload_stream(url: str, key_name: str) -> bool:
|
||||
"""
|
||||
Скачивает аудиофайл по URL потоками и передаёт в upload_file_to_s3 для загрузки.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"[INFO] Начало скачивания {url}")
|
||||
async with aiohttp.ClientSession() as http_session:
|
||||
async with http_session.get(url) as resp:
|
||||
resp.raise_for_status()
|
||||
|
||||
# читаем файл по частям и собираем в bytes
|
||||
chunks = []
|
||||
while True:
|
||||
chunk = await resp.content.read(1024 * 1024) # 1 МБ
|
||||
if not chunk:
|
||||
break
|
||||
chunks.append(chunk)
|
||||
file_bytes = b"".join(chunks)
|
||||
|
||||
logger.info(f"[INFO] Скачивание {url} завершено, размер {len(file_bytes)} байт")
|
||||
|
||||
# передаём в функцию загрузки с retry
|
||||
success = await upload_file_to_s3(file_bytes, key_name)
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[FAILED] Не удалось скачать или загрузить {url}: {e}")
|
||||
return False
|
||||
|
||||
async def upload_file_to_s3(file_bytes: bytes, key_name: str) -> bool:
|
||||
"""
|
||||
Асинхронно загружает файл в S3 с ретраями.
|
||||
Возвращает True при успехе, False при ошибке.
|
||||
"""
|
||||
session = aioboto3.Session()
|
||||
for attempt in range(1, settings.MAX_RETRIES + 1):
|
||||
try:
|
||||
logger.info(f"Попытка {attempt} загрузки {key_name} в S3")
|
||||
#logger.info(f"Параметры S3: {S3_ENDPOINT_URL}, {S3_BUCKET_NAME}, {AWS_REGION}")
|
||||
|
||||
async with session.client(
|
||||
"s3",
|
||||
endpoint_url=settings.S3_ENDPOINT_URL,
|
||||
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
|
||||
region_name=settings.AWS_REGION
|
||||
) as s3:
|
||||
await s3.put_object(Bucket=settings.S3_BUCKET_NAME, Key=key_name, Body=file_bytes)
|
||||
logger.info(f"[OK] Файл {key_name} загружен в S3!")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception(f"[ERROR] Попытка {attempt} для {key_name} не удалась: {e}")
|
||||
if attempt < settings.MAX_RETRIES:
|
||||
await asyncio.sleep(settings.RETRY_DELAY)
|
||||
else:
|
||||
logger.error(f"[FAILED] Файл {key_name} не удалось загрузить после {settings.MAX_RETRIES} попыток")
|
||||
return False
|
||||
60
services/ingest-service/app/infrastructure/scheduler.py
Normal file
60
services/ingest-service/app/infrastructure/scheduler.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
||||
from app.models.call_event import CallEvent
|
||||
from app.infrastructure.s3 import download_and_upload_stream
|
||||
from app.infrastructure.database import get_db
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BATCH_SIZE = 20
|
||||
MAX_RETRIES_CALL = 3
|
||||
async def fetch_pending_calls(session: AsyncSession):
|
||||
stmt = (
|
||||
select(CallEvent)
|
||||
.where(CallEvent.processing_status == "pending")
|
||||
.where(CallEvent.retries < MAX_RETRIES_CALL)
|
||||
.order_by(CallEvent.id)
|
||||
.limit(BATCH_SIZE)
|
||||
.with_for_update(skip_locked=True)
|
||||
)
|
||||
result = await session.execute(stmt)
|
||||
calls = result.scalars().all()
|
||||
return calls
|
||||
|
||||
async def process_pending_calls():
|
||||
logger.info("[INFO] Запуск обработки новых записей")
|
||||
|
||||
async for session in get_db():
|
||||
calls = await fetch_pending_calls(session)
|
||||
if not calls:
|
||||
logger.info("[INFO] Нет новых записей для обработки")
|
||||
return
|
||||
|
||||
# сразу увеличиваем retries и помечаем processing
|
||||
for call in calls:
|
||||
call.retries += 1
|
||||
call.processing_status = "processing"
|
||||
await session.commit()
|
||||
|
||||
# создаём задачи на скачивание/загрузку
|
||||
tasks = []
|
||||
for call in calls:
|
||||
key_name = f"audio/{call.id}.mp3"
|
||||
logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link)
|
||||
tasks.append(download_and_upload_stream(call.full_record_file_link, key_name))
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# обновляем статус после обработки
|
||||
for call, result in zip(calls, results):
|
||||
if isinstance(result, Exception) or result is False:
|
||||
call.processing_status = "failed"
|
||||
else:
|
||||
call.processing_status = "done"
|
||||
call.s3_key = f"audio/{call.id}.mp3"
|
||||
|
||||
await session.commit()
|
||||
logger.info(f"Обработка пачки из {len(calls)} записей завершена")
|
||||
@@ -1,10 +1,16 @@
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import FastAPI, Request, status
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.concurrency import asynccontextmanager
|
||||
import logging
|
||||
from app.infrastructure.scheduler import process_pending_calls
|
||||
|
||||
from app.api.uis import router as uis_router
|
||||
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
from fastapi import FastAPI
|
||||
import asyncio
|
||||
from app.infrastructure.database import get_db
|
||||
|
||||
# Настройка логирования
|
||||
logging.basicConfig(
|
||||
@@ -13,13 +19,23 @@ logging.basicConfig(
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app:FastAPI):
|
||||
scheduler = AsyncIOScheduler()
|
||||
|
||||
scheduler.add_job(process_pending_calls, "interval", seconds = 10)
|
||||
|
||||
scheduler.start()
|
||||
yield
|
||||
await scheduler.shutdown(wait=True)
|
||||
|
||||
app = FastAPI(
|
||||
title="Ingest Service API",
|
||||
description="Микросервис для приема событий звонков",
|
||||
version="1.0.0",
|
||||
lifespan=lifespan
|
||||
)
|
||||
|
||||
|
||||
# Exception handlers
|
||||
@app.exception_handler(CallEventAlreadyExistsError)
|
||||
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from sqlalchemy import Column, BigInteger, Integer, String, DateTime
|
||||
from sqlalchemy import Column, BigInteger, Integer, String, DateTime, Enum as SQLEnum
|
||||
from enum import Enum
|
||||
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
|
||||
import uuid
|
||||
from app.infrastructure.database import Base
|
||||
from enum import Enum as PyEnum
|
||||
|
||||
class CallEvent(Base):
|
||||
__tablename__ = "call_events"
|
||||
@@ -20,3 +22,7 @@ class CallEvent(Base):
|
||||
clean_talk_time_duration = Column(Integer, nullable=False)
|
||||
full_record_file_link = Column(String, nullable=False)
|
||||
tcm_topcrm_notification_name = Column(String, nullable=False)
|
||||
|
||||
s3_key = Column(String, nullable=True)
|
||||
processing_status = Column(String, default="pending", nullable=False)
|
||||
retries = Column(Integer, default=0, nullable=False)
|
||||
@@ -6,12 +6,8 @@ services:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
container_name: ingest-service
|
||||
environment:
|
||||
# Подключение к PostgreSQL из infra/docker-compose.yaml
|
||||
DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db
|
||||
APP_HOST: 0.0.0.0
|
||||
APP_PORT: 8000
|
||||
DEBUG: "False"
|
||||
env_file:
|
||||
- .env
|
||||
ports:
|
||||
- "8000:8000"
|
||||
networks:
|
||||
@@ -21,7 +17,13 @@ services:
|
||||
command: >
|
||||
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
|
||||
healthcheck:
|
||||
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"]
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"python",
|
||||
"-c",
|
||||
"import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()",
|
||||
]
|
||||
interval: 30s
|
||||
timeout: 3s
|
||||
start_period: 10s
|
||||
|
||||
@@ -5,4 +5,6 @@ sqlalchemy==2.0.23
|
||||
asyncpg==0.29.0
|
||||
alembic==1.13.1
|
||||
python-dotenv==1.0.0
|
||||
|
||||
aioboto3==15.5.0
|
||||
apscheduler==3.11.1
|
||||
pydantic_settings==2.11.0
|
||||
Reference in New Issue
Block a user