diff --git a/services/ingest-service/alembic/env.py b/services/ingest-service/alembic/env.py index 382bda3..f587a9c 100644 --- a/services/ingest-service/alembic/env.py +++ b/services/ingest-service/alembic/env.py @@ -1,29 +1,22 @@ from logging.config import fileConfig -from sqlalchemy import engine_from_config -from sqlalchemy import pool from alembic import context -import os - +import asyncio from dotenv import load_dotenv + load_dotenv() -from app.core.database import Base +from app.infrastructure.database import engine, Base # Важно! Импортируем все модели, чтобы они зарегистрировались в Base.metadata from app.models import CallEvent config = context.config -# Переопределяем URL из переменных окружения -database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/ingest_db") -# Преобразуем asyncpg URL в psycopg2 -sync_database_url = database_url.replace("postgresql+asyncpg://", "postgresql://") -config.set_main_option("sqlalchemy.url", sync_database_url) - if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata + def run_migrations_offline() -> None: """Run migrations in 'offline' mode. @@ -48,26 +41,25 @@ def run_migrations_offline() -> None: context.run_migrations() +def do_run_migrations(connection): + """Запускаем миграции с существующим подключением""" + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations(): + """Асинхронный запуск миграций через async движок""" + async with engine.connect() as connection: + await connection.run_sync(do_run_migrations) + + await engine.dispose() + + def run_migrations_online() -> None: - """Run migrations in 'online' mode. - - In this scenario we need to create an Engine - and associate a connection with the context. - - """ - connectable = engine_from_config( - config.get_section(config.config_ini_section, {}), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) - - with connectable.connect() as connection: - context.configure( - connection=connection, target_metadata=target_metadata - ) - - with context.begin_transaction(): - context.run_migrations() + """Run migrations in 'online' mode через async движок""" + asyncio.run(run_async_migrations()) if context.is_offline_mode(): diff --git a/services/ingest-service/app/api/uis.py b/services/ingest-service/app/api/uis.py index e5359a0..ed63f9a 100644 --- a/services/ingest-service/app/api/uis.py +++ b/services/ingest-service/app/api/uis.py @@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, HttpUrl from sqlalchemy.ext.asyncio import AsyncSession -from app.database import get_db +from app.infrastructure.database import get_db from app import crud class CallDirection(str, Enum): @@ -15,7 +15,7 @@ class CallDirection(str, Enum): class UisCallEvent(BaseModel): eventType: str - call_session_id: str + call_session_id: int direction: CallDirection employee_id: int employee_full_name: str @@ -28,91 +28,30 @@ class UisCallEvent(BaseModel): full_record_file_link: HttpUrl campaign_name: str - class Config: - from_attributes = True - - -class CallEventResponse(BaseModel): - id: int - event_type: str - call_session_id: str - direction: str - employee_id: int - employee_full_name: str - contact_phone_number: str - called_phone_number: str - communication_group_name: str - start_time: datetime - finish_time: datetime - talk_time_duration: int - full_record_file_link: str - campaign_name: str - created_at: datetime - updated_at: datetime - - class Config: - from_attributes = True - - router = APIRouter() -@router.post("/webhook", response_model=CallEventResponse, status_code=201) -async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)): +@router.post("/webhook", status_code=204) +async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None: """Webhook для получения событий звонков от UIS""" - # Проверяем, не существует ли уже событие с таким call_session_id - existing_event = await crud.get_call_event_by_session_id(db, callEvent.call_session_id) - if existing_event: - raise HTTPException(status_code=400, detail="Call event with this session_id already exists") - # Преобразуем Pydantic модель в словарь для БД - call_event_data = { - "event_type": callEvent.eventType, + # Преобразуем UisCallEvent в данные для CallEvent (БД) + event_data = { "call_session_id": callEvent.call_session_id, - "direction": callEvent.direction, + "direction": callEvent.direction.value, "employee_id": callEvent.employee_id, - "employee_full_name": callEvent.employee_full_name, - "contact_phone_number": callEvent.contact_phone_number, - "called_phone_number": callEvent.called_phone_number, - "communication_group_name": callEvent.communication_group_name, - "start_time": callEvent.start_time, - "finish_time": callEvent.finish_time, + "last_answered_employee_full_name": callEvent.employee_full_name, + "finish_time": int(callEvent.finish_time.timestamp()), "talk_time_duration": callEvent.talk_time_duration, "full_record_file_link": str(callEvent.full_record_file_link), - "campaign_name": callEvent.campaign_name, + # Поля, которых нет в UisCallEvent - заполняем значениями по умолчанию + "notification_mnemonic": callEvent.eventType, + "tcm_topcrm_notification_name": callEvent.campaign_name, + "total_time_duration": callEvent.talk_time_duration, # Упрощение + "wait_time_duration": 0, # Нет в запросе + "total_wait_time_duration": 0, # Нет в запросе + "clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение } - db_call_event = await crud.create_call_event(db, call_event_data) - return db_call_event - - -@router.get("/events", response_model=List[CallEventResponse]) -async def get_call_events( - skip: int = 0, - limit: int = 100, - db: AsyncSession = Depends(get_db) -): - """Получить список всех событий звонков""" - events = await crud.get_all_call_events(db, skip=skip, limit=limit) - return events - - -@router.get("/events/{call_session_id}", response_model=CallEventResponse) -async def get_call_event(call_session_id: str, db: AsyncSession = Depends(get_db)): - """Получить событие звонка по session_id""" - event = await crud.get_call_event_by_session_id(db, call_session_id) - if not event: - raise HTTPException(status_code=404, detail="Call event not found") - return event - - -@router.get("/events/employee/{employee_id}", response_model=List[CallEventResponse]) -async def get_employee_call_events( - employee_id: int, - skip: int = 0, - limit: int = 100, - db: AsyncSession = Depends(get_db) -): - """Получить все звонки конкретного сотрудника""" - events = await crud.get_call_events_by_employee(db, employee_id, skip=skip, limit=limit) - return events \ No newline at end of file + # Сохраняем в БД + await crud.create_call_event(db, event_data) \ No newline at end of file diff --git a/services/ingest-service/app/crud.py b/services/ingest-service/app/crud.py new file mode 100644 index 0000000..0855e91 --- /dev/null +++ b/services/ingest-service/app/crud.py @@ -0,0 +1,11 @@ +from sqlalchemy.ext.asyncio import AsyncSession +from app.models.call_event import CallEvent + + +async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent: + """Создать новое событие звонка в БД""" + call_event = CallEvent(**event_data) + db.add(call_event) + await db.commit() + await db.refresh(call_event) + return call_event diff --git a/services/ingest-service/app/infrastructure/database.py b/services/ingest-service/app/infrastructure/database.py index 914461d..c438538 100644 --- a/services/ingest-service/app/infrastructure/database.py +++ b/services/ingest-service/app/infrastructure/database.py @@ -1,22 +1,22 @@ import os -from sqlalchemy import create_engine +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker from dotenv import load_dotenv load_dotenv() -# Для Alembic используем синхронный движок с psycopg2 DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db") -# Преобразуем asyncpg URL в psycopg2 для синхронного движка -SYNC_DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://") -engine = create_engine(SYNC_DATABASE_URL) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +# Единый async движок для всего приложения +engine = create_async_engine(DATABASE_URL, echo=True) +AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + Base = declarative_base() -def get_db(): - db = SessionLocal() - try: - yield db - finally: - db.close() \ No newline at end of file + +async def get_db(): + """Dependency для получения async сессии БД""" + async with AsyncSessionLocal() as session: + try: + yield session + finally: + await session.close() \ No newline at end of file diff --git a/services/ingest-service/app/models/call_event.py b/services/ingest-service/app/models/call_event.py index e368d20..a27c60c 100644 --- a/services/ingest-service/app/models/call_event.py +++ b/services/ingest-service/app/models/call_event.py @@ -1,10 +1,12 @@ -from sqlalchemy import Column, BigInteger, Integer, UUID, String -from app.core.database import Base +from sqlalchemy import Column, BigInteger, Integer, String, DateTime +from sqlalchemy.dialects.postgresql import UUID as PG_UUID +import uuid +from app.infrastructure.database import Base class CallEvent(Base): __tablename__ = "call_events" - id = Column(UUID, primary_key=True, index=True) + id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) call_session_id = Column(BigInteger, nullable=False, index=True) direction = Column(String, nullable=False) notification_mnemonic = Column(String, nullable=False)