Доработка вебхука

This commit is contained in:
2025-11-20 23:16:09 +07:00
parent 8f5b984598
commit 1538f72c8f
5 changed files with 69 additions and 125 deletions

View File

@@ -1,29 +1,22 @@
from logging.config import fileConfig from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context from alembic import context
import os import asyncio
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
from app.core.database import Base from app.infrastructure.database import engine, Base
# Важно! Импортируем все модели, чтобы они зарегистрировались в Base.metadata # Важно! Импортируем все модели, чтобы они зарегистрировались в Base.metadata
from app.models import CallEvent from app.models import CallEvent
config = context.config config = context.config
# Переопределяем URL из переменных окружения
database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/ingest_db")
# Преобразуем asyncpg URL в psycopg2
sync_database_url = database_url.replace("postgresql+asyncpg://", "postgresql://")
config.set_main_option("sqlalchemy.url", sync_database_url)
if config.config_file_name is not None: if config.config_file_name is not None:
fileConfig(config.config_file_name) fileConfig(config.config_file_name)
target_metadata = Base.metadata target_metadata = Base.metadata
def run_migrations_offline() -> None: def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode. """Run migrations in 'offline' mode.
@@ -48,26 +41,25 @@ def run_migrations_offline() -> None:
context.run_migrations() context.run_migrations()
def do_run_migrations(connection):
"""Запускаем миграции с существующим подключением"""
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
"""Асинхронный запуск миграций через async движок"""
async with engine.connect() as connection:
await connection.run_sync(do_run_migrations)
await engine.dispose()
def run_migrations_online() -> None: def run_migrations_online() -> None:
"""Run migrations in 'online' mode. """Run migrations in 'online' mode через async движок"""
asyncio.run(run_async_migrations())
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode(): if context.is_offline_mode():

View File

@@ -5,7 +5,7 @@ from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db from app.infrastructure.database import get_db
from app import crud from app import crud
class CallDirection(str, Enum): class CallDirection(str, Enum):
@@ -15,7 +15,7 @@ class CallDirection(str, Enum):
class UisCallEvent(BaseModel): class UisCallEvent(BaseModel):
eventType: str eventType: str
call_session_id: str call_session_id: int
direction: CallDirection direction: CallDirection
employee_id: int employee_id: int
employee_full_name: str employee_full_name: str
@@ -28,91 +28,30 @@ class UisCallEvent(BaseModel):
full_record_file_link: HttpUrl full_record_file_link: HttpUrl
campaign_name: str campaign_name: str
class Config:
from_attributes = True
class CallEventResponse(BaseModel):
id: int
event_type: str
call_session_id: str
direction: str
employee_id: int
employee_full_name: str
contact_phone_number: str
called_phone_number: str
communication_group_name: str
start_time: datetime
finish_time: datetime
talk_time_duration: int
full_record_file_link: str
campaign_name: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
router = APIRouter() router = APIRouter()
@router.post("/webhook", response_model=CallEventResponse, status_code=201) @router.post("/webhook", status_code=204)
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)): async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None:
"""Webhook для получения событий звонков от UIS""" """Webhook для получения событий звонков от UIS"""
# Проверяем, не существует ли уже событие с таким call_session_id
existing_event = await crud.get_call_event_by_session_id(db, callEvent.call_session_id)
if existing_event:
raise HTTPException(status_code=400, detail="Call event with this session_id already exists")
# Преобразуем Pydantic модель в словарь для БД # Преобразуем UisCallEvent в данные для CallEvent (БД)
call_event_data = { event_data = {
"event_type": callEvent.eventType,
"call_session_id": callEvent.call_session_id, "call_session_id": callEvent.call_session_id,
"direction": callEvent.direction, "direction": callEvent.direction.value,
"employee_id": callEvent.employee_id, "employee_id": callEvent.employee_id,
"employee_full_name": callEvent.employee_full_name, "last_answered_employee_full_name": callEvent.employee_full_name,
"contact_phone_number": callEvent.contact_phone_number, "finish_time": int(callEvent.finish_time.timestamp()),
"called_phone_number": callEvent.called_phone_number,
"communication_group_name": callEvent.communication_group_name,
"start_time": callEvent.start_time,
"finish_time": callEvent.finish_time,
"talk_time_duration": callEvent.talk_time_duration, "talk_time_duration": callEvent.talk_time_duration,
"full_record_file_link": str(callEvent.full_record_file_link), "full_record_file_link": str(callEvent.full_record_file_link),
"campaign_name": callEvent.campaign_name, # Поля, которых нет в UisCallEvent - заполняем значениями по умолчанию
"notification_mnemonic": callEvent.eventType,
"tcm_topcrm_notification_name": callEvent.campaign_name,
"total_time_duration": callEvent.talk_time_duration, # Упрощение
"wait_time_duration": 0, # Нет в запросе
"total_wait_time_duration": 0, # Нет в запросе
"clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение
} }
db_call_event = await crud.create_call_event(db, call_event_data) # Сохраняем в БД
return db_call_event await crud.create_call_event(db, event_data)
@router.get("/events", response_model=List[CallEventResponse])
async def get_call_events(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Получить список всех событий звонков"""
events = await crud.get_all_call_events(db, skip=skip, limit=limit)
return events
@router.get("/events/{call_session_id}", response_model=CallEventResponse)
async def get_call_event(call_session_id: str, db: AsyncSession = Depends(get_db)):
"""Получить событие звонка по session_id"""
event = await crud.get_call_event_by_session_id(db, call_session_id)
if not event:
raise HTTPException(status_code=404, detail="Call event not found")
return event
@router.get("/events/employee/{employee_id}", response_model=List[CallEventResponse])
async def get_employee_call_events(
employee_id: int,
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Получить все звонки конкретного сотрудника"""
events = await crud.get_call_events_by_employee(db, employee_id, skip=skip, limit=limit)
return events

View File

@@ -0,0 +1,11 @@
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.call_event import CallEvent
async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent:
"""Создать новое событие звонка в БД"""
call_event = CallEvent(**event_data)
db.add(call_event)
await db.commit()
await db.refresh(call_event)
return call_event

View File

@@ -1,22 +1,22 @@
import os import os
from sqlalchemy import create_engine from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
# Для Alembic используем синхронный движок с psycopg2
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db")
# Преобразуем asyncpg URL в psycopg2 для синхронного движка
SYNC_DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")
engine = create_engine(SYNC_DATABASE_URL) # Единый async движок для всего приложения
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base() Base = declarative_base()
def get_db():
db = SessionLocal() async def get_db():
try: """Dependency для получения async сессии БД"""
yield db async with AsyncSessionLocal() as session:
finally: try:
db.close() yield session
finally:
await session.close()

View File

@@ -1,10 +1,12 @@
from sqlalchemy import Column, BigInteger, Integer, UUID, String from sqlalchemy import Column, BigInteger, Integer, String, DateTime
from app.core.database import Base from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
from app.infrastructure.database import Base
class CallEvent(Base): class CallEvent(Base):
__tablename__ = "call_events" __tablename__ = "call_events"
id = Column(UUID, primary_key=True, index=True) id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
call_session_id = Column(BigInteger, nullable=False, index=True) call_session_id = Column(BigInteger, nullable=False, index=True)
direction = Column(String, nullable=False) direction = Column(String, nullable=False)
notification_mnemonic = Column(String, nullable=False) notification_mnemonic = Column(String, nullable=False)