Files
call-review-platform/services/ingest-service/app/api/uis.py

57 lines
2.1 KiB
Python

from datetime import datetime
from enum import Enum
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, HttpUrl
from sqlalchemy.ext.asyncio import AsyncSession
from app.infrastructure.database import get_db
from app import crud
class CallDirection(str, Enum):
in_ = "in"
out = "out"
class UisCallEvent(BaseModel):
eventType: str
call_session_id: int
direction: CallDirection
employee_id: int
employee_full_name: str
contact_phone_number: str
called_phone_number: str
communication_group_name: str
start_time: datetime
finish_time: datetime
talk_time_duration: int
full_record_file_link: HttpUrl
campaign_name: str
router = APIRouter()
@router.post("/webhook", status_code=204)
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None:
"""Webhook для получения событий звонков от UIS"""
# Преобразуем UisCallEvent в данные для CallEvent (БД)
event_data = {
"call_session_id": callEvent.call_session_id,
"direction": callEvent.direction.value,
"employee_id": callEvent.employee_id,
"last_answered_employee_full_name": callEvent.employee_full_name,
"finish_time": int(callEvent.finish_time.timestamp()),
"talk_time_duration": callEvent.talk_time_duration,
"full_record_file_link": str(callEvent.full_record_file_link),
# Поля, которых нет в UisCallEvent - заполняем значениями по умолчанию
"notification_mnemonic": callEvent.eventType,
"tcm_topcrm_notification_name": callEvent.campaign_name,
"total_time_duration": callEvent.talk_time_duration, # Упрощение
"wait_time_duration": 0, # Нет в запросе
"total_wait_time_duration": 0, # Нет в запросе
"clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение
}
# Сохраняем в БД
await crud.create_call_event(db, event_data)