Compare commits

...

11 Commits

24 changed files with 791 additions and 114 deletions

46
ansible/playbook.yml Normal file
View File

@@ -0,0 +1,46 @@
---
- hosts: dev
become: yes
become_method: sudo
roles:
- docker
tasks:
- name: Enable 22 port ufw
ufw:
rule: allow
port: "22"
proto: tcp
state: enabled
- name: Enable 80 port ufw
ufw:
rule: allow
port: "80"
proto: tcp
state: enabled
- name: Copy repository to server
copy:
src: "{{ item.src }}"
dest: "{{ item.dest }}"
loop:
- {
src: "{{ playbook_dir }}/../infra",
dest: "/home/{{ ansible_user }}/call-review-platform/infra",
}
- {
src: "{{ playbook_dir }}/../services",
dest: "/home/{{ ansible_user }}/call-review-platform/services",
}
- name: "Deploy infra"
tags: deploy_infra
community.docker.docker_compose_v2:
project_src: "/home/{{ ansible_user }}/call-review-platform/infra/infra"
state: present
recreate: always
- name: "Deploy ingest service"
tags: deploy_ingest
community.docker.docker_compose_v2:
project_src: "/home/{{ ansible_user }}/call-review-platform/services/services/ingest-service"
state: present
build: always
recreate: always

View File

@@ -0,0 +1,31 @@
---
- name: Install required packages
apt:
name:
- apt-transport-https
- ca-certificates
- curl
- gnupg
state: present
update_cache: yes
- name: Add Docker GPG key
apt_key:
url: https://download.docker.com/linux/ubuntu/gpg
state: present
- name: Add Docker repository
apt_repository:
repo: "deb [arch=amd64] https://download.docker.com/linux/ubuntu focal stable"
state: present
- name: Install Docker and Compose plugin
apt:
name:
- docker-ce
- docker-ce-cli
- containerd.io
- docker-buildx-plugin
- docker-compose-plugin
state: present
update_cache: yes

View File

@@ -1,6 +1,15 @@
version: "3.8"
services: services:
nginx:
image: nginx:latest
container_name: nginx
ports:
- "80:80"
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d:ro
depends_on:
- minio
networks:
- call-review-network
postgres: postgres:
image: postgres:15-alpine image: postgres:15-alpine
container_name: ingest-postgres container_name: ingest-postgres
@@ -8,8 +17,6 @@ services:
POSTGRES_DB: ingest_db POSTGRES_DB: ingest_db
POSTGRES_USER: postgres POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - postgres_data:/var/lib/postgresql/data
healthcheck: healthcheck:
@@ -17,6 +24,24 @@ services:
interval: 10s interval: 10s
timeout: 5s timeout: 5s
retries: 5 retries: 5
networks:
- call-review-network
minio:
image: minio/minio
container_name: minio
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password123
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
networks:
- call-review-network
volumes: volumes:
postgres_data: postgres_data:
minio_data:
networks:
call-review-network:
name: call-review-network
driver: bridge

View File

@@ -0,0 +1,12 @@
server {
listen 80;
server_name crs-ingest-service.petrovskiy.ru;
location / {
proxy_pass http://ingest-service:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}

View File

@@ -0,0 +1,33 @@
server {
listen 80;
server_name s3.bikmeefftest.ru;
location / {
proxy_pass http://minio:9000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
server {
listen 80;
server_name crs-minio.petrovskiy.ru;
location / {
proxy_pass http://minio:9001;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

View File

@@ -0,0 +1,40 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
venv/
env/
ENV/
.venv
# IDEs
.vscode/
.idea/
*.swp
*.swo
*~
# Git
.git/
.gitignore
# Documentation
README.md
*.md
# Environment files
.env
.env.*
# Testing
.pytest_cache/
.coverage
htmlcov/
*.log
# OS
.DS_Store
Thumbs.db

View File

@@ -4,4 +4,5 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
# Application # Application
APP_HOST=0.0.0.0 APP_HOST=0.0.0.0
APP_PORT=8000 APP_PORT=8000
DEBUG=False

View File

@@ -1,11 +1,40 @@
FROM python:3.9-slim # Multi-stage build для уменьшения размера образа
FROM python:3.9-slim as builder
WORKDIR /app WORKDIR /app
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt RUN pip install --user --no-cache-dir -r requirements.txt
COPY app ./app # Финальный образ
FROM python:3.9-slim
WORKDIR /app
# Создаем непривилегированного пользователя для безопасности
RUN useradd -m -u 1000 appuser && \
chown -R appuser:appuser /app
# Копируем установленные пакеты из builder stage
COPY --from=builder --chown=appuser:appuser /root/.local /home/appuser/.local
# Копируем код приложения
COPY --chown=appuser:appuser app ./app
# Копируем файлы миграций Alembic
COPY --chown=appuser:appuser alembic ./alembic
COPY --chown=appuser:appuser alembic.ini .
# Переключаемся на непривилегированного пользователя
USER appuser
# Добавляем установленные пакеты в PATH
ENV PATH=/home/appuser/.local/bin:$PATH
# Healthcheck для проверки состояния сервиса
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()" || exit 1
# Запуск приложения
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -17,31 +17,73 @@ ingest-service/
├── app/ ├── app/
│ ├── __init__.py │ ├── __init__.py
│ ├── main.py # Главный файл приложения │ ├── main.py # Главный файл приложения
│ ├── database.py # Конфигурация БД
│ ├── models.py # SQLAlchemy модели
│ ├── crud.py # CRUD операции │ ├── crud.py # CRUD операции
── api/ ── api/
│ │ ├── __init__.py
│ │ └── uis.py # API endpoints для UIS
│ ├── infrastructure/
│ │ ├── __init__.py
│ │ └── database.py # Конфигурация БД
│ └── models/
│ ├── __init__.py │ ├── __init__.py
│ └── uis.py # API endpoints для UIS │ └── call_event.py # SQLAlchemy модели
├── docker-compose.yaml ├── alembic/ # Миграции БД
│ ├── env.py
│ └── versions/
├── alembic.ini
├── Dockerfile ├── Dockerfile
├── requirements.txt ├── requirements.txt
└── .env.example └── .env
``` ```
## Быстрый старт ## Быстрый старт
### 1. Запуск через Docker Compose (рекомендуется) ### 1. Запуск через Docker Compose (рекомендуется)
> ⚠️ **ВАЖНО**: Необходимо сначала запустить инфраструктуру (PostgreSQL), которая создаст общую сеть `call-review-network`, а затем микросервис.
**Шаг 1: Запустить инфраструктуру (PostgreSQL)**
```bash ```bash
# Запустить все сервисы (PostgreSQL + приложение) # Из корня проекта
docker-compose up -d docker-compose -f infra/docker-compose.yaml up -d
# Проверить логи # Проверить, что PostgreSQL запущен и сеть создана
docker-compose logs -f app docker network ls | grep call-review
docker ps --filter "name=ingest-postgres"
```
# Остановить сервисы **Шаг 2: Запустить микросервис ingest-service**
docker-compose down ```bash
# Из корня проекта
docker-compose -f services/ingest-service/docker-compose.yaml up -d
# Проверить логи (увидите применение миграций и старт сервиса)
docker-compose -f services/ingest-service/docker-compose.yaml logs -f
# Проверить статус
docker ps --filter "name=ingest"
```
**Проверка работы:**
```bash
# Health check
curl http://localhost:8000/health
# Swagger UI
open http://localhost:8000/docs
```
**Остановка сервисов:**
```bash
# Остановить микросервис (из корня проекта)
docker-compose -f services/ingest-service/docker-compose.yaml down
# Остановить инфраструктуру (ВНИМАНИЕ: остановит PostgreSQL!)
docker-compose -f infra/docker-compose.yaml down
# Или остановить всё сразу
docker-compose -f services/ingest-service/docker-compose.yaml down && \
docker-compose -f infra/docker-compose.yaml down
``` ```
Приложение будет доступно по адресу: http://localhost:8000 Приложение будет доступно по адресу: http://localhost:8000
@@ -51,6 +93,13 @@ Swagger документация: http://localhost:8000/docs
### 2. Локальный запуск для разработки ### 2. Локальный запуск для разработки
```bash ```bash
# Запустить только PostgreSQL через Docker
cd ../../infra
docker-compose up -d postgres
# Вернуться в папку сервиса
cd ../services/ingest-service
# Создать виртуальное окружение # Создать виртуальное окружение
python -m venv venv python -m venv venv
source venv/bin/activate # для Linux/Mac source venv/bin/activate # для Linux/Mac
@@ -60,11 +109,11 @@ venv\Scripts\activate # для Windows
# Установить зависимости # Установить зависимости
pip install -r requirements.txt pip install -r requirements.txt
# Запустить только PostgreSQL через Docker # Убедиться, что файл .env настроен правильно
docker-compose up -d postgres # DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
# Создать файл .env # Применить миграции
cp .env.example .env alembic upgrade head
# Запустить приложение # Запустить приложение
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
@@ -79,7 +128,7 @@ Webhook для приема событий звонков от UIS
```json ```json
{ {
"eventType": "call_completed", "eventType": "call_completed",
"call_session_id": "12345-abcde", "call_session_id": 12345,
"direction": "in", "direction": "in",
"employee_id": 100, "employee_id": 100,
"employee_full_name": "Иванов Иван Иванович", "employee_full_name": "Иванов Иван Иванович",
@@ -94,45 +143,49 @@ Webhook для приема событий звонков от UIS
} }
``` ```
### GET /v1/uis/events ### GET /v1/uis/events (TODO)
Получить список всех событий звонков Получить список всех событий звонков
**Query параметры:** **Query параметры:**
- `skip` - количество пропускаемых записей (пагинация) - `skip` - количество пропускаемых записей (пагинация)
- `limit` - максимальное количество записей (по умолчанию 100) - `limit` - максимальное количество записей (по умолчанию 100)
### GET /v1/uis/events/{call_session_id} римечание: Endpoint запланирован к реализации_
### GET /v1/uis/events/{call_session_id} (TODO)
Получить конкретное событие звонка по session_id Получить конкретное событие звонка по session_id
### GET /v1/uis/events/employee/{employee_id} римечание: Endpoint запланирован к реализации_
### GET /v1/uis/events/employee/{employee_id} (TODO)
Получить все звонки конкретного сотрудника Получить все звонки конкретного сотрудника
**Query параметры:** **Query параметры:**
- `skip` - количество пропускаемых записей - `skip` - количество пропускаемых записей
- `limit` - максимальное количество записей - `limit` - максимальное количество записей
римечание: Endpoint запланирован к реализации_
## База данных ## База данных
### Модель данных CallEvent ### Модель данных CallEvent
Таблица `call_events` содержит следующие поля: Таблица `call_events` содержит следующие поля:
- `id` - уникальный идентификатор (автоинкремент) - `id` - уникальный идентификатор (UUID)
- `event_type` - тип события - `call_session_id` - уникальный ID сессии звонка (BigInteger, индексируется)
- `call_session_id` - уникальный ID сессии звонка
- `direction` - направление звонка (in/out) - `direction` - направление звонка (in/out)
- `notification_mnemonic` - мнемоника уведомления (тип события)
- `last_answered_employee_full_name` - ФИО сотрудника, ответившего на звонок
- `employee_id` - ID сотрудника - `employee_id` - ID сотрудника
- `employee_full_name` - ФИО сотрудника - `finish_time` - время окончания звонка (Unix timestamp)
- `contact_phone_number` - телефон контакта - `total_time_duration` - общая длительность звонка (в секундах)
- `called_phone_number` - набранный телефон - `wait_time_duration` - длительность ожидания (в секундах)
- `communication_group_name` - название группы коммуникации - `total_wait_time_duration` - общая длительность ожидания (в секундах)
- `start_time` - время начала звонка
- `finish_time` - время окончания звонка
- `talk_time_duration` - длительность разговора (в секундах) - `talk_time_duration` - длительность разговора (в секундах)
- `clean_talk_time_duration` - чистая длительность разговора (в секундах)
- `full_record_file_link` - ссылка на запись звонка - `full_record_file_link` - ссылка на запись звонка
- `campaign_name` - название кампании - `tcm_topcrm_notification_name` - название уведомления TCM/TopCRM (название кампании)
- `created_at` - дата создания записи
- `updated_at` - дата последнего обновления
### Подключение к PostgreSQL ### Подключение к PostgreSQL

View File

@@ -0,0 +1,27 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@@ -1,8 +1,8 @@
"""Initial migration with call_events table """INIT
Revision ID: a7e5c5ef6bc1 Revision ID: 49dc6736230f
Revises: Revises:
Create Date: 2025-11-19 22:43:33.739763 Create Date: 2025-12-06 13:21:32.669444
""" """
from typing import Sequence, Union from typing import Sequence, Union
@@ -12,14 +12,13 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision: str = 'a7e5c5ef6bc1' revision: str = '49dc6736230f'
down_revision: Union[str, Sequence[str], None] = None down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None: def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.create_table('call_events', op.create_table('call_events',
sa.Column('id', sa.UUID(), nullable=False), sa.Column('id', sa.UUID(), nullable=False),
@@ -36,17 +35,20 @@ def upgrade() -> None:
sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False), sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False),
sa.Column('full_record_file_link', sa.String(), nullable=False), sa.Column('full_record_file_link', sa.String(), nullable=False),
sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False), sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False),
sa.Column('s3_key', sa.String(), nullable=True),
sa.Column('processing_status', sa.String(), nullable=False),
sa.Column('retries', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id')
) )
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False) op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False) op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False)
# ### end Alembic commands ### # ### end Alembic commands ###
def downgrade() -> None: def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_id'), table_name='call_events') op.drop_index(op.f('ix_call_events_id'), table_name='call_events')
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events') op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.drop_table('call_events') op.drop_table('call_events')
# ### end Alembic commands ### # ### end Alembic commands ###

View File

@@ -1,39 +1,79 @@
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import List from typing import List
import logging
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl, Field, field_validator
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.infrastructure.database import get_db from app.infrastructure.database import get_db
from app import crud from app import crud
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
logger = logging.getLogger(__name__)
class CallDirection(str, Enum): class CallDirection(str, Enum):
"""Направление звонка"""
in_ = "in" in_ = "in"
out = "out" out = "out"
class UisCallEvent(BaseModel): class UisCallEvent(BaseModel):
eventType: str """Схема события звонка от UIS"""
call_session_id: int eventType: str = Field(..., description="Тип события")
direction: CallDirection call_session_id: int = Field(..., gt=0, description="Уникальный ID сессии звонка")
employee_id: int direction: CallDirection = Field(..., description="Направление звонка")
employee_full_name: str employee_id: int = Field(..., gt=0, description="ID сотрудника")
contact_phone_number: str employee_full_name: str = Field(..., min_length=1, description="ФИО сотрудника")
called_phone_number: str contact_phone_number: str = Field(..., description="Телефон контакта")
communication_group_name: str called_phone_number: str = Field(..., description="Набранный телефон")
start_time: datetime communication_group_name: str = Field(..., description="Группа коммуникации")
finish_time: datetime start_time: datetime = Field(..., description="Время начала звонка")
talk_time_duration: int finish_time: datetime = Field(..., description="Время окончания звонка")
full_record_file_link: HttpUrl talk_time_duration: int = Field(..., ge=0, description="Длительность разговора (сек)")
campaign_name: str full_record_file_link: HttpUrl = Field(..., description="Ссылка на запись")
campaign_name: str = Field(..., description="Название кампании")
@field_validator('finish_time')
@classmethod
def validate_finish_time(cls, v: datetime, info) -> datetime:
"""Проверяем, что finish_time >= start_time"""
if 'start_time' in info.data and v < info.data['start_time']:
raise ValueError('finish_time must be greater than or equal to start_time')
return v
router = APIRouter() router = APIRouter()
@router.post("/webhook", status_code=204) @router.post("/webhook", status_code=204)
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None: async def create_call_event(
"""Webhook для получения событий звонков от UIS""" callEvent: UisCallEvent,
db: AsyncSession = Depends(get_db)
) -> None:
"""
Webhook для получения событий звонков от UIS
**Обрабатывает события:**
- Сохраняет событие звонка в базу данных
- Проверяет уникальность call_session_id
- Логирует успешную обработку
**Возможные ошибки:**
- 409 Conflict: Событие с таким call_session_id уже существует
- 422 Unprocessable Entity: Ошибка валидации данных
- 500 Internal Server Error: Ошибка сервера/БД
"""
logger.info(
f"Received webhook event",
extra={
"event_type": callEvent.eventType,
"call_session_id": callEvent.call_session_id,
"employee_id": callEvent.employee_id
}
)
# Преобразуем UisCallEvent в данные для CallEvent (БД) # Преобразуем UisCallEvent в данные для CallEvent (БД)
event_data = { event_data = {
@@ -53,5 +93,5 @@ async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(
"clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение "clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение
} }
# Сохраняем в БД # Сохраняем в БД (исключения обрабатываются в exception handlers)
await crud.create_call_event(db, event_data) await crud.create_call_event(db, event_data)

View File

@@ -0,0 +1,18 @@
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str
S3_ENDPOINT_URL: str
AWS_ACCESS_KEY_ID: str
AWS_SECRET_ACCESS_KEY: str
AWS_REGION: str
S3_BUCKET_NAME: str
MAX_RETRIES: int
RETRY_DELAY: int
# class Config:
# env_file = ".env"
# env_file_encoding = "utf-8"
settings = Settings()

View File

@@ -0,0 +1,29 @@
"""Кастомные исключения для приложения"""
class AppException(Exception):
"""Базовое исключение приложения"""
def __init__(self, message: str, details: dict = None):
self.message = message
self.details = details or {}
super().__init__(self.message)
class CallEventAlreadyExistsError(AppException):
"""Событие звонка с таким call_session_id уже существует"""
def __init__(self, call_session_id: int):
super().__init__(
message=f"Call event with session_id {call_session_id} already exists",
details={"call_session_id": call_session_id}
)
class DatabaseError(AppException):
"""Ошибка работы с базой данных"""
pass
class ValidationError(AppException):
"""Ошибка валидации данных"""
pass

View File

@@ -1,11 +1,61 @@
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from asyncpg.exceptions import UniqueViolationError
import logging
from app.models.call_event import CallEvent from app.models.call_event import CallEvent
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
logger = logging.getLogger(__name__)
async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent: async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent:
"""Создать новое событие звонка в БД""" """Создать новое событие звонка в БД
Args:
db: Async сессия базы данных
event_data: Данные события звонка
Returns:
CallEvent: Созданное событие
Raises:
CallEventAlreadyExistsError: Если событие с таким call_session_id уже существует
DatabaseError: При других ошибках БД
"""
try:
call_event = CallEvent(**event_data) call_event = CallEvent(**event_data)
db.add(call_event) db.add(call_event)
await db.commit() await db.commit()
await db.refresh(call_event) await db.refresh(call_event)
logger.info(
f"Call event created successfully",
extra={
"call_session_id": call_event.call_session_id,
"employee_id": call_event.employee_id
}
)
return call_event return call_event
except IntegrityError as e:
await db.rollback()
# Проверяем, является ли это ошибкой уникальности call_session_id
if isinstance(e.orig, UniqueViolationError):
call_session_id = event_data.get("call_session_id")
logger.warning(
f"Duplicate call event detected",
extra={"call_session_id": call_session_id}
)
raise CallEventAlreadyExistsError(call_session_id)
# Другие ошибки целостности
logger.error(f"Database integrity error: {str(e)}")
raise DatabaseError(f"Failed to create call event: {str(e)}")
except Exception as e:
await db.rollback()
logger.error(f"Unexpected error creating call event: {str(e)}", exc_info=True)
raise DatabaseError(f"Unexpected database error: {str(e)}")

View File

@@ -2,13 +2,28 @@ import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db")
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "yes")
# Единый async движок для всего приложения # Единый async движок для всего приложения
engine = create_async_engine(DATABASE_URL, echo=True) # echo=True включается только в режиме DEBUG
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) engine = create_async_engine(
DATABASE_URL,
echo=DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base() Base = declarative_base()

View File

@@ -0,0 +1,67 @@
import aioboto3
import asyncio
import logging
import os
import aiohttp
from app.config import settings
logger = logging.getLogger(__name__)
async def download_and_upload_stream(url: str, key_name: str) -> bool:
"""
Скачивает аудиофайл по URL потоками и передаёт в upload_file_to_s3 для загрузки.
"""
try:
logger.info(f"[INFO] Начало скачивания {url}")
async with aiohttp.ClientSession() as http_session:
async with http_session.get(url) as resp:
resp.raise_for_status()
# читаем файл по частям и собираем в bytes
chunks = []
while True:
chunk = await resp.content.read(1024 * 1024) # 1 МБ
if not chunk:
break
chunks.append(chunk)
file_bytes = b"".join(chunks)
logger.info(f"[INFO] Скачивание {url} завершено, размер {len(file_bytes)} байт")
# передаём в функцию загрузки с retry
success = await upload_file_to_s3(file_bytes, key_name)
return success
except Exception as e:
logger.error(f"[FAILED] Не удалось скачать или загрузить {url}: {e}")
return False
async def upload_file_to_s3(file_bytes: bytes, key_name: str) -> bool:
"""
Асинхронно загружает файл в S3 с ретраями.
Возвращает True при успехе, False при ошибке.
"""
session = aioboto3.Session()
for attempt in range(1, settings.MAX_RETRIES + 1):
try:
logger.info(f"Попытка {attempt} загрузки {key_name} в S3")
#logger.info(f"Параметры S3: {S3_ENDPOINT_URL}, {S3_BUCKET_NAME}, {AWS_REGION}")
async with session.client(
"s3",
endpoint_url=settings.S3_ENDPOINT_URL,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION
) as s3:
await s3.put_object(Bucket=settings.S3_BUCKET_NAME, Key=key_name, Body=file_bytes)
logger.info(f"[OK] Файл {key_name} загружен в S3!")
return True
except Exception as e:
logger.exception(f"[ERROR] Попытка {attempt} для {key_name} не удалась: {e}")
if attempt < settings.MAX_RETRIES:
await asyncio.sleep(settings.RETRY_DELAY)
else:
logger.error(f"[FAILED] Файл {key_name} не удалось загрузить после {settings.MAX_RETRIES} попыток")
return False

View File

@@ -0,0 +1,60 @@
import asyncio
import logging
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from app.models.call_event import CallEvent
from app.infrastructure.s3 import download_and_upload_stream
from app.infrastructure.database import get_db
from app.config import settings
logger = logging.getLogger(__name__)
BATCH_SIZE = 20
MAX_RETRIES_CALL = 3
async def fetch_pending_calls(session: AsyncSession):
stmt = (
select(CallEvent)
.where(CallEvent.processing_status == "pending")
.where(CallEvent.retries < MAX_RETRIES_CALL)
.order_by(CallEvent.id)
.limit(BATCH_SIZE)
.with_for_update(skip_locked=True)
)
result = await session.execute(stmt)
calls = result.scalars().all()
return calls
async def process_pending_calls():
logger.info("[INFO] Запуск обработки новых записей")
async for session in get_db():
calls = await fetch_pending_calls(session)
if not calls:
logger.info("[INFO] Нет новых записей для обработки")
return
# сразу увеличиваем retries и помечаем processing
for call in calls:
call.retries += 1
call.processing_status = "processing"
await session.commit()
# создаём задачи на скачивание/загрузку
tasks = []
for call in calls:
key_name = f"audio/{call.id}.mp3"
logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link)
tasks.append(download_and_upload_stream(call.full_record_file_link, key_name))
results = await asyncio.gather(*tasks, return_exceptions=True)
# обновляем статус после обработки
for call, result in zip(calls, results):
if isinstance(result, Exception) or result is False:
call.processing_status = "failed"
else:
call.processing_status = "done"
call.s3_key = f"audio/{call.id}.mp3"
await session.commit()
logger.info(f"Обработка пачки из {len(calls)} записей завершена")

View File

@@ -1,11 +1,102 @@
from contextlib import asynccontextmanager from fastapi import FastAPI, Request, status
from fastapi import FastAPI from fastapi.responses import JSONResponse
from fastapi.concurrency import asynccontextmanager
import logging
from app.infrastructure.scheduler import process_pending_calls
from app.api.uis import router as uis_router from app.api.uis import router as uis_router
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI
import asyncio
from app.infrastructure.database import get_db
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app:FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(process_pending_calls, "interval", seconds = 10)
scheduler.start()
yield
await scheduler.shutdown(wait=True)
app = FastAPI( app = FastAPI(
title="Ingest Service API", title="Ingest Service API",
description="Микросервис для приема событий звонков", description="Микросервис для приема событий звонков",
version="1.0.0", version="1.0.0",
lifespan=lifespan
) )
# Exception handlers
@app.exception_handler(CallEventAlreadyExistsError)
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
"""Обработчик для дубликатов событий звонков"""
logger.warning(f"Duplicate call event: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_409_CONFLICT,
content={
"error": "duplicate_call_event",
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(DatabaseError)
async def database_error_handler(request: Request, exc: DatabaseError):
"""Обработчик для ошибок БД"""
logger.error(f"Database error: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "database_error",
"message": "Internal database error occurred",
"details": exc.details
}
)
@app.exception_handler(ValidationError)
async def validation_error_handler(request: Request, exc: ValidationError):
"""Обработчик для ошибок валидации"""
logger.warning(f"Validation error: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": "validation_error",
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Общий обработчик для неперехваченных исключений"""
logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "internal_server_error",
"message": "An unexpected error occurred"
}
)
# Health check endpoint
@app.get("/health", tags=["Health"])
async def health_check():
"""Проверка состояния сервиса"""
return {"status": "healthy", "service": "ingest-service"}
app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"]) app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"])

View File

@@ -1,35 +0,0 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Enum as SQLEnum
from app.infrastructure import Base
import enum
class CallDirectionEnum(str, enum.Enum):
in_ = "in"
out = "out"
class CallEvent(Base):
"""Модель для хранения событий звонков"""
__tablename__ = "call_events"
id = Column(Integer, primary_key=True, index=True)
event_type = Column(String, nullable=False)
call_session_id = Column(String, unique=True, index=True, nullable=False)
direction = Column(SQLEnum(CallDirectionEnum), nullable=False)
employee_id = Column(Integer, nullable=False, index=True)
employee_full_name = Column(String, nullable=False)
contact_phone_number = Column(String, nullable=False)
called_phone_number = Column(String, nullable=False)
communication_group_name = Column(String)
start_time = Column(DateTime, nullable=False)
finish_time = Column(DateTime, nullable=False)
talk_time_duration = Column(Integer, nullable=False)
full_record_file_link = Column(String, nullable=False)
campaign_name = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<CallEvent(id={self.id}, call_session_id={self.call_session_id})>"

View File

@@ -1,13 +1,15 @@
from sqlalchemy import Column, BigInteger, Integer, String, DateTime from sqlalchemy import Column, BigInteger, Integer, String, DateTime, Enum as SQLEnum
from enum import Enum
from sqlalchemy.dialects.postgresql import UUID as PG_UUID from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid import uuid
from app.infrastructure.database import Base from app.infrastructure.database import Base
from enum import Enum as PyEnum
class CallEvent(Base): class CallEvent(Base):
__tablename__ = "call_events" __tablename__ = "call_events"
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True) id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
call_session_id = Column(BigInteger, nullable=False, index=True) call_session_id = Column(BigInteger, nullable=False, unique=True, index=True)
direction = Column(String, nullable=False) direction = Column(String, nullable=False)
notification_mnemonic = Column(String, nullable=False) notification_mnemonic = Column(String, nullable=False)
last_answered_employee_full_name = Column(String, nullable=True) last_answered_employee_full_name = Column(String, nullable=True)
@@ -20,3 +22,7 @@ class CallEvent(Base):
clean_talk_time_duration = Column(Integer, nullable=False) clean_talk_time_duration = Column(Integer, nullable=False)
full_record_file_link = Column(String, nullable=False) full_record_file_link = Column(String, nullable=False)
tcm_topcrm_notification_name = Column(String, nullable=False) tcm_topcrm_notification_name = Column(String, nullable=False)
s3_key = Column(String, nullable=True)
processing_status = Column(String, default="pending", nullable=False)
retries = Column(Integer, default=0, nullable=False)

View File

@@ -0,0 +1,35 @@
version: "3.8"
services:
ingest-service:
build:
context: .
dockerfile: Dockerfile
container_name: ingest-service
env_file:
- .env
ports:
- "8000:8000"
networks:
- call-review-network
restart: unless-stopped
# Запускаем миграции перед стартом приложения
command: >
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
healthcheck:
test:
[
"CMD",
"python",
"-c",
"import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()",
]
interval: 30s
timeout: 3s
start_period: 10s
retries: 3
networks:
call-review-network:
external: true
name: call-review-network

View File

@@ -5,4 +5,6 @@ sqlalchemy==2.0.23
asyncpg==0.29.0 asyncpg==0.29.0
alembic==1.13.1 alembic==1.13.1
python-dotenv==1.0.0 python-dotenv==1.0.0
aioboto3==15.5.0
apscheduler==3.11.1
pydantic_settings==2.11.0