Compare commits

..

5 Commits

Author SHA1 Message Date
3d4430134b feature: added exception handling 2025-11-20 23:38:26 +07:00
5d5b6140d4 Updated docker 2025-11-20 23:34:49 +07:00
f2fca50d5a Add unique constraint for session id 2025-11-20 23:29:56 +07:00
6691a1a98e Updated README.md 2025-11-20 23:27:52 +07:00
5b3298858c Deleted old models.py 2025-11-20 23:23:09 +07:00
16 changed files with 481 additions and 96 deletions

View File

@@ -17,6 +17,13 @@ services:
interval: 10s
timeout: 5s
retries: 5
networks:
- call-review-network
volumes:
postgres_data:
networks:
call-review-network:
name: call-review-network
driver: bridge

View File

@@ -0,0 +1,40 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
env/
ENV/
.venv
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
# Git
.git/
.gitignore
# Documentation
README.md
*.md
# Environment files
.env
.env.*
# Testing
.pytest_cache/
.coverage
htmlcov/
*.log
# OS
.DS_Store
Thumbs.db

View File

@@ -4,4 +4,5 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
# Application
APP_HOST=0.0.0.0
APP_PORT=8000
DEBUG=False

View File

@@ -1,11 +1,40 @@
FROM python:3.9-slim
# Multi-stage build для уменьшения размера образа
FROM python:3.9-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --user --no-cache-dir -r requirements.txt
COPY app ./app
# Финальный образ
FROM python:3.9-slim
WORKDIR /app
# Создаем непривилегированного пользователя для безопасности
RUN useradd -m -u 1000 appuser && \
chown -R appuser:appuser /app
# Копируем установленные пакеты из builder stage
COPY --from=builder --chown=appuser:appuser /root/.local /home/appuser/.local
# Копируем код приложения
COPY --chown=appuser:appuser app ./app
# Копируем файлы миграций Alembic
COPY --chown=appuser:appuser alembic ./alembic
COPY --chown=appuser:appuser alembic.ini .
# Переключаемся на непривилегированного пользователя
USER appuser
# Добавляем установленные пакеты в PATH
ENV PATH=/home/appuser/.local/bin:$PATH
# Healthcheck для проверки состояния сервиса
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()" || exit 1
# Запуск приложения
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -16,31 +16,61 @@
ingest-service/
├── app/
│ ├── __init__.py
│ ├── main.py # Главный файл приложения
│ ├── database.py # Конфигурация БД
│ ├── models.py # SQLAlchemy модели
│ ├── crud.py # CRUD операции
│ └── api/
│ ├── main.py # Главный файл приложения
│ ├── crud.py # CRUD операции
│ ├── api/
│ ├── __init__.py
│ └── uis.py # API endpoints для UIS
│ ├── infrastructure/
│ │ ├── __init__.py
│ │ └── database.py # Конфигурация БД
│ └── models/
│ ├── __init__.py
│ └── uis.py # API endpoints для UIS
├── docker-compose.yaml
│ └── call_event.py # SQLAlchemy модели
├── alembic/ # Миграции БД
│ ├── env.py
│ └── versions/
├── alembic.ini
├── Dockerfile
├── requirements.txt
└── .env.example
└── .env
```
## Быстрый старт
### 1. Запуск через Docker Compose (рекомендуется)
**Шаг 1: Запустить инфраструктуру (PostgreSQL)**
```bash
# Запустить все сервисы (PostgreSQL + приложение)
# Из корня проекта
cd infra
docker-compose up -d
# Проверить, что PostgreSQL запущен
docker-compose ps
```
**Шаг 2: Запустить микросервис ingest-service**
```bash
# Из папки микросервиса
cd ../services/ingest-service
docker-compose up -d
# Проверить логи
docker-compose logs -f app
docker-compose logs -f
# Остановить сервисы
# Проверить статус
docker-compose ps
```
**Остановка сервисов:**
```bash
# Остановить микросервис
cd services/ingest-service
docker-compose down
# Остановить инфраструктуру
cd ../../infra
docker-compose down
```
@@ -51,6 +81,13 @@ Swagger документация: http://localhost:8000/docs
### 2. Локальный запуск для разработки
```bash
# Запустить только PostgreSQL через Docker
cd ../../infra
docker-compose up -d postgres
# Вернуться в папку сервиса
cd ../services/ingest-service
# Создать виртуальное окружение
python -m venv venv
source venv/bin/activate # для Linux/Mac
@@ -60,11 +97,11 @@ venv\Scripts\activate # для Windows
# Установить зависимости
pip install -r requirements.txt
# Запустить только PostgreSQL через Docker
docker-compose up -d postgres
# Убедиться, что файл .env настроен правильно
# DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
# Создать файл .env
cp .env.example .env
# Применить миграции
alembic upgrade head
# Запустить приложение
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
@@ -79,7 +116,7 @@ Webhook для приема событий звонков от UIS
```json
{
"eventType": "call_completed",
"call_session_id": "12345-abcde",
"call_session_id": 12345,
"direction": "in",
"employee_id": 100,
"employee_full_name": "Иванов Иван Иванович",
@@ -94,45 +131,49 @@ Webhook для приема событий звонков от UIS
}
```
### GET /v1/uis/events
### GET /v1/uis/events (TODO)
Получить список всех событий звонков
**Query параметры:**
- `skip` - количество пропускаемых записей (пагинация)
- `limit` - максимальное количество записей (по умолчанию 100)
### GET /v1/uis/events/{call_session_id}
римечание: Endpoint запланирован к реализации_
### GET /v1/uis/events/{call_session_id} (TODO)
Получить конкретное событие звонка по session_id
### GET /v1/uis/events/employee/{employee_id}
римечание: Endpoint запланирован к реализации_
### GET /v1/uis/events/employee/{employee_id} (TODO)
Получить все звонки конкретного сотрудника
**Query параметры:**
- `skip` - количество пропускаемых записей
- `limit` - максимальное количество записей
римечание: Endpoint запланирован к реализации_
## База данных
### Модель данных CallEvent
Таблица `call_events` содержит следующие поля:
- `id` - уникальный идентификатор (автоинкремент)
- `event_type` - тип события
- `call_session_id` - уникальный ID сессии звонка
- `id` - уникальный идентификатор (UUID)
- `call_session_id` - уникальный ID сессии звонка (BigInteger, индексируется)
- `direction` - направление звонка (in/out)
- `notification_mnemonic` - мнемоника уведомления (тип события)
- `last_answered_employee_full_name` - ФИО сотрудника, ответившего на звонок
- `employee_id` - ID сотрудника
- `employee_full_name` - ФИО сотрудника
- `contact_phone_number` - телефон контакта
- `called_phone_number` - набранный телефон
- `communication_group_name` - название группы коммуникации
- `start_time` - время начала звонка
- `finish_time` - время окончания звонка
- `finish_time` - время окончания звонка (Unix timestamp)
- `total_time_duration` - общая длительность звонка (в секундах)
- `wait_time_duration` - длительность ожидания (в секундах)
- `total_wait_time_duration` - общая длительность ожидания (в секундах)
- `talk_time_duration` - длительность разговора (в секундах)
- `clean_talk_time_duration` - чистая длительность разговора (в секундах)
- `full_record_file_link` - ссылка на запись звонка
- `campaign_name` - название кампании
- `created_at` - дата создания записи
- `updated_at` - дата последнего обновления
- `tcm_topcrm_notification_name` - название уведомления TCM/TopCRM (название кампании)
### Подключение к PostgreSQL

View File

@@ -0,0 +1,27 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -0,0 +1,33 @@
"""add_unique_constraint_to_call_session_id
Revision ID: 9163176d6848
Revises: a7e5c5ef6bc1
Create Date: 2025-11-20 23:28:40.770696
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '9163176d6848'
down_revision: Union[str, Sequence[str], None] = 'a7e5c5ef6bc1'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False)
# ### end Alembic commands ###

View File

@@ -1,39 +1,79 @@
from datetime import datetime
from enum import Enum
from typing import List
import logging
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, HttpUrl
from pydantic import BaseModel, HttpUrl, Field, field_validator
from sqlalchemy.ext.asyncio import AsyncSession
from app.infrastructure.database import get_db
from app import crud
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
logger = logging.getLogger(__name__)
class CallDirection(str, Enum):
"""Направление звонка"""
in_ = "in"
out = "out"
class UisCallEvent(BaseModel):
eventType: str
call_session_id: int
direction: CallDirection
employee_id: int
employee_full_name: str
contact_phone_number: str
called_phone_number: str
communication_group_name: str
start_time: datetime
finish_time: datetime
talk_time_duration: int
full_record_file_link: HttpUrl
campaign_name: str
"""Схема события звонка от UIS"""
eventType: str = Field(..., description="Тип события")
call_session_id: int = Field(..., gt=0, description="Уникальный ID сессии звонка")
direction: CallDirection = Field(..., description="Направление звонка")
employee_id: int = Field(..., gt=0, description="ID сотрудника")
employee_full_name: str = Field(..., min_length=1, description="ФИО сотрудника")
contact_phone_number: str = Field(..., description="Телефон контакта")
called_phone_number: str = Field(..., description="Набранный телефон")
communication_group_name: str = Field(..., description="Группа коммуникации")
start_time: datetime = Field(..., description="Время начала звонка")
finish_time: datetime = Field(..., description="Время окончания звонка")
talk_time_duration: int = Field(..., ge=0, description="Длительность разговора (сек)")
full_record_file_link: HttpUrl = Field(..., description="Ссылка на запись")
campaign_name: str = Field(..., description="Название кампании")
@field_validator('finish_time')
@classmethod
def validate_finish_time(cls, v: datetime, info) -> datetime:
"""Проверяем, что finish_time >= start_time"""
if 'start_time' in info.data and v < info.data['start_time']:
raise ValueError('finish_time must be greater than or equal to start_time')
return v
router = APIRouter()
@router.post("/webhook", status_code=204)
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None:
"""Webhook для получения событий звонков от UIS"""
async def create_call_event(
callEvent: UisCallEvent,
db: AsyncSession = Depends(get_db)
) -> None:
"""
Webhook для получения событий звонков от UIS
**Обрабатывает события:**
- Сохраняет событие звонка в базу данных
- Проверяет уникальность call_session_id
- Логирует успешную обработку
**Возможные ошибки:**
- 409 Conflict: Событие с таким call_session_id уже существует
- 422 Unprocessable Entity: Ошибка валидации данных
- 500 Internal Server Error: Ошибка сервера/БД
"""
logger.info(
f"Received webhook event",
extra={
"event_type": callEvent.eventType,
"call_session_id": callEvent.call_session_id,
"employee_id": callEvent.employee_id
}
)
# Преобразуем UisCallEvent в данные для CallEvent (БД)
event_data = {
@@ -53,5 +93,5 @@ async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(
"clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение
}
# Сохраняем в БД
# Сохраняем в БД (исключения обрабатываются в exception handlers)
await crud.create_call_event(db, event_data)

View File

@@ -0,0 +1,29 @@
"""Кастомные исключения для приложения"""
class AppException(Exception):
"""Базовое исключение приложения"""
def __init__(self, message: str, details: dict = None):
self.message = message
self.details = details or {}
super().__init__(self.message)
class CallEventAlreadyExistsError(AppException):
"""Событие звонка с таким call_session_id уже существует"""
def __init__(self, call_session_id: int):
super().__init__(
message=f"Call event with session_id {call_session_id} already exists",
details={"call_session_id": call_session_id}
)
class DatabaseError(AppException):
"""Ошибка работы с базой данных"""
pass
class ValidationError(AppException):
"""Ошибка валидации данных"""
pass

View File

@@ -1,11 +1,61 @@
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from asyncpg.exceptions import UniqueViolationError
import logging
from app.models.call_event import CallEvent
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
logger = logging.getLogger(__name__)
async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent:
"""Создать новое событие звонка в БД"""
call_event = CallEvent(**event_data)
db.add(call_event)
await db.commit()
await db.refresh(call_event)
return call_event
"""Создать новое событие звонка в БД
Args:
db: Async сессия базы данных
event_data: Данные события звонка
Returns:
CallEvent: Созданное событие
Raises:
CallEventAlreadyExistsError: Если событие с таким call_session_id уже существует
DatabaseError: При других ошибках БД
"""
try:
call_event = CallEvent(**event_data)
db.add(call_event)
await db.commit()
await db.refresh(call_event)
logger.info(
f"Call event created successfully",
extra={
"call_session_id": call_event.call_session_id,
"employee_id": call_event.employee_id
}
)
return call_event
except IntegrityError as e:
await db.rollback()
# Проверяем, является ли это ошибкой уникальности call_session_id
if isinstance(e.orig, UniqueViolationError):
call_session_id = event_data.get("call_session_id")
logger.warning(
f"Duplicate call event detected",
extra={"call_session_id": call_session_id}
)
raise CallEventAlreadyExistsError(call_session_id)
# Другие ошибки целостности
logger.error(f"Database integrity error: {str(e)}")
raise DatabaseError(f"Failed to create call event: {str(e)}")
except Exception as e:
await db.rollback()
logger.error(f"Unexpected error creating call event: {str(e)}", exc_info=True)
raise DatabaseError(f"Unexpected database error: {str(e)}")

View File

@@ -2,13 +2,28 @@ import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db")
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "yes")
# Единый async движок для всего приложения
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# echo=True включается только в режиме DEBUG
engine = create_async_engine(
DATABASE_URL,
echo=DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base()

View File

@@ -1,6 +1,17 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
import logging
from app.api.uis import router as uis_router
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(
title="Ingest Service API",
@@ -8,4 +19,68 @@ app = FastAPI(
version="1.0.0",
)
# Exception handlers
@app.exception_handler(CallEventAlreadyExistsError)
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
"""Обработчик для дубликатов событий звонков"""
logger.warning(f"Duplicate call event: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_409_CONFLICT,
content={
"error": "duplicate_call_event",
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(DatabaseError)
async def database_error_handler(request: Request, exc: DatabaseError):
"""Обработчик для ошибок БД"""
logger.error(f"Database error: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "database_error",
"message": "Internal database error occurred",
"details": exc.details
}
)
@app.exception_handler(ValidationError)
async def validation_error_handler(request: Request, exc: ValidationError):
"""Обработчик для ошибок валидации"""
logger.warning(f"Validation error: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": "validation_error",
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Общий обработчик для неперехваченных исключений"""
logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "internal_server_error",
"message": "An unexpected error occurred"
}
)
# Health check endpoint
@app.get("/health", tags=["Health"])
async def health_check():
"""Проверка состояния сервиса"""
return {"status": "healthy", "service": "ingest-service"}
app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"])

View File

@@ -1,35 +0,0 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Enum as SQLEnum
from app.infrastructure import Base
import enum
class CallDirectionEnum(str, enum.Enum):
in_ = "in"
out = "out"
class CallEvent(Base):
"""Модель для хранения событий звонков"""
__tablename__ = "call_events"
id = Column(Integer, primary_key=True, index=True)
event_type = Column(String, nullable=False)
call_session_id = Column(String, unique=True, index=True, nullable=False)
direction = Column(SQLEnum(CallDirectionEnum), nullable=False)
employee_id = Column(Integer, nullable=False, index=True)
employee_full_name = Column(String, nullable=False)
contact_phone_number = Column(String, nullable=False)
called_phone_number = Column(String, nullable=False)
communication_group_name = Column(String)
start_time = Column(DateTime, nullable=False)
finish_time = Column(DateTime, nullable=False)
talk_time_duration = Column(Integer, nullable=False)
full_record_file_link = Column(String, nullable=False)
campaign_name = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<CallEvent(id={self.id}, call_session_id={self.call_session_id})>"

View File

@@ -7,7 +7,7 @@ class CallEvent(Base):
__tablename__ = "call_events"
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
call_session_id = Column(BigInteger, nullable=False, index=True)
call_session_id = Column(BigInteger, nullable=False, unique=True, index=True)
direction = Column(String, nullable=False)
notification_mnemonic = Column(String, nullable=False)
last_answered_employee_full_name = Column(String, nullable=True)

View File

@@ -0,0 +1,33 @@
version: "3.8"
services:
ingest-service:
build:
context: .
dockerfile: Dockerfile
container_name: ingest-service
environment:
# Подключение к PostgreSQL из infra/docker-compose.yaml
DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db
APP_HOST: 0.0.0.0
APP_PORT: 8000
DEBUG: "False"
ports:
- "8000:8000"
networks:
- call-review-network
restart: unless-stopped
# Запускаем миграции перед стартом приложения
command: >
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"]
interval: 30s
timeout: 3s
start_period: 10s
retries: 3
networks:
call-review-network:
external: true
name: call-review-network