Compare commits
11 Commits
1538f72c8f
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 6b09daab61 | |||
| a5d3a72d0c | |||
| 3cc360c8b9 | |||
| f72451cdd8 | |||
| bceb06b5b6 | |||
| 6862ea5760 | |||
| 3d4430134b | |||
| 5d5b6140d4 | |||
| f2fca50d5a | |||
| 6691a1a98e | |||
| 5b3298858c |
46
ansible/playbook.yml
Normal file
46
ansible/playbook.yml
Normal file
@@ -0,0 +1,46 @@
|
||||
---
|
||||
- hosts: dev
|
||||
become: yes
|
||||
become_method: sudo
|
||||
roles:
|
||||
- docker
|
||||
tasks:
|
||||
- name: Enable 22 port ufw
|
||||
ufw:
|
||||
rule: allow
|
||||
port: "22"
|
||||
proto: tcp
|
||||
state: enabled
|
||||
|
||||
- name: Enable 80 port ufw
|
||||
ufw:
|
||||
rule: allow
|
||||
port: "80"
|
||||
proto: tcp
|
||||
state: enabled
|
||||
- name: Copy repository to server
|
||||
copy:
|
||||
src: "{{ item.src }}"
|
||||
dest: "{{ item.dest }}"
|
||||
loop:
|
||||
- {
|
||||
src: "{{ playbook_dir }}/../infra",
|
||||
dest: "/home/{{ ansible_user }}/call-review-platform/infra",
|
||||
}
|
||||
- {
|
||||
src: "{{ playbook_dir }}/../services",
|
||||
dest: "/home/{{ ansible_user }}/call-review-platform/services",
|
||||
}
|
||||
- name: "Deploy infra"
|
||||
tags: deploy_infra
|
||||
community.docker.docker_compose_v2:
|
||||
project_src: "/home/{{ ansible_user }}/call-review-platform/infra/infra"
|
||||
state: present
|
||||
recreate: always
|
||||
- name: "Deploy ingest service"
|
||||
tags: deploy_ingest
|
||||
community.docker.docker_compose_v2:
|
||||
project_src: "/home/{{ ansible_user }}/call-review-platform/services/services/ingest-service"
|
||||
state: present
|
||||
build: always
|
||||
recreate: always
|
||||
31
ansible/roles/docker/tasks/main.yml
Normal file
31
ansible/roles/docker/tasks/main.yml
Normal file
@@ -0,0 +1,31 @@
|
||||
---
|
||||
- name: Install required packages
|
||||
apt:
|
||||
name:
|
||||
- apt-transport-https
|
||||
- ca-certificates
|
||||
- curl
|
||||
- gnupg
|
||||
state: present
|
||||
update_cache: yes
|
||||
|
||||
- name: Add Docker GPG key
|
||||
apt_key:
|
||||
url: https://download.docker.com/linux/ubuntu/gpg
|
||||
state: present
|
||||
|
||||
- name: Add Docker repository
|
||||
apt_repository:
|
||||
repo: "deb [arch=amd64] https://download.docker.com/linux/ubuntu focal stable"
|
||||
state: present
|
||||
|
||||
- name: Install Docker and Compose plugin
|
||||
apt:
|
||||
name:
|
||||
- docker-ce
|
||||
- docker-ce-cli
|
||||
- containerd.io
|
||||
- docker-buildx-plugin
|
||||
- docker-compose-plugin
|
||||
state: present
|
||||
update_cache: yes
|
||||
@@ -1,6 +1,15 @@
|
||||
version: "3.8"
|
||||
|
||||
services:
|
||||
nginx:
|
||||
image: nginx:latest
|
||||
container_name: nginx
|
||||
ports:
|
||||
- "80:80"
|
||||
volumes:
|
||||
- ./nginx/conf.d:/etc/nginx/conf.d:ro
|
||||
depends_on:
|
||||
- minio
|
||||
networks:
|
||||
- call-review-network
|
||||
postgres:
|
||||
image: postgres:15-alpine
|
||||
container_name: ingest-postgres
|
||||
@@ -8,8 +17,6 @@ services:
|
||||
POSTGRES_DB: ingest_db
|
||||
POSTGRES_USER: postgres
|
||||
POSTGRES_PASSWORD: postgres
|
||||
ports:
|
||||
- "5432:5432"
|
||||
volumes:
|
||||
- postgres_data:/var/lib/postgresql/data
|
||||
healthcheck:
|
||||
@@ -17,6 +24,24 @@ services:
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
networks:
|
||||
- call-review-network
|
||||
minio:
|
||||
image: minio/minio
|
||||
container_name: minio
|
||||
environment:
|
||||
MINIO_ROOT_USER: admin
|
||||
MINIO_ROOT_PASSWORD: password123
|
||||
command: server /data --console-address ":9001"
|
||||
volumes:
|
||||
- minio_data:/data
|
||||
networks:
|
||||
- call-review-network
|
||||
volumes:
|
||||
postgres_data:
|
||||
minio_data:
|
||||
|
||||
networks:
|
||||
call-review-network:
|
||||
name: call-review-network
|
||||
driver: bridge
|
||||
|
||||
12
infra/nginx/conf.d/ingest-service.conf
Normal file
12
infra/nginx/conf.d/ingest-service.conf
Normal file
@@ -0,0 +1,12 @@
|
||||
server {
|
||||
listen 80;
|
||||
server_name crs-ingest-service.petrovskiy.ru;
|
||||
|
||||
location / {
|
||||
proxy_pass http://ingest-service:8000;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
}
|
||||
}
|
||||
33
infra/nginx/conf.d/minio.conf
Normal file
33
infra/nginx/conf.d/minio.conf
Normal file
@@ -0,0 +1,33 @@
|
||||
server {
|
||||
listen 80;
|
||||
server_name s3.bikmeefftest.ru;
|
||||
|
||||
location / {
|
||||
proxy_pass http://minio:9000;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Upgrade $http_upgrade;
|
||||
proxy_set_header Connection "upgrade";
|
||||
}
|
||||
}
|
||||
|
||||
server {
|
||||
listen 80;
|
||||
server_name crs-minio.petrovskiy.ru;
|
||||
|
||||
location / {
|
||||
proxy_pass http://minio:9001;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
|
||||
proxy_http_version 1.1;
|
||||
proxy_set_header Upgrade $http_upgrade;
|
||||
proxy_set_header Connection "upgrade";
|
||||
}
|
||||
}
|
||||
40
services/ingest-service/.dockerignore
Normal file
40
services/ingest-service/.dockerignore
Normal file
@@ -0,0 +1,40 @@
|
||||
# Python
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
*.so
|
||||
.Python
|
||||
venv/
|
||||
env/
|
||||
ENV/
|
||||
.venv
|
||||
|
||||
# IDEs
|
||||
.vscode/
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
*~
|
||||
|
||||
# Git
|
||||
.git/
|
||||
.gitignore
|
||||
|
||||
# Documentation
|
||||
README.md
|
||||
*.md
|
||||
|
||||
# Environment files
|
||||
.env
|
||||
.env.*
|
||||
|
||||
# Testing
|
||||
.pytest_cache/
|
||||
.coverage
|
||||
htmlcov/
|
||||
*.log
|
||||
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
|
||||
@@ -4,4 +4,5 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
|
||||
# Application
|
||||
APP_HOST=0.0.0.0
|
||||
APP_PORT=8000
|
||||
DEBUG=False
|
||||
|
||||
|
||||
@@ -1,11 +1,40 @@
|
||||
FROM python:3.9-slim
|
||||
# Multi-stage build для уменьшения размера образа
|
||||
FROM python:3.9-slim as builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
RUN pip install --user --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY app ./app
|
||||
# Финальный образ
|
||||
FROM python:3.9-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Создаем непривилегированного пользователя для безопасности
|
||||
RUN useradd -m -u 1000 appuser && \
|
||||
chown -R appuser:appuser /app
|
||||
|
||||
# Копируем установленные пакеты из builder stage
|
||||
COPY --from=builder --chown=appuser:appuser /root/.local /home/appuser/.local
|
||||
|
||||
# Копируем код приложения
|
||||
COPY --chown=appuser:appuser app ./app
|
||||
|
||||
# Копируем файлы миграций Alembic
|
||||
COPY --chown=appuser:appuser alembic ./alembic
|
||||
COPY --chown=appuser:appuser alembic.ini .
|
||||
|
||||
# Переключаемся на непривилегированного пользователя
|
||||
USER appuser
|
||||
|
||||
# Добавляем установленные пакеты в PATH
|
||||
ENV PATH=/home/appuser/.local/bin:$PATH
|
||||
|
||||
# Healthcheck для проверки состояния сервиса
|
||||
HEALTHCHECK --interval=30s --timeout=3s --start-period=10s --retries=3 \
|
||||
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()" || exit 1
|
||||
|
||||
# Запуск приложения
|
||||
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
|
||||
|
||||
|
||||
@@ -16,32 +16,74 @@
|
||||
ingest-service/
|
||||
├── app/
|
||||
│ ├── __init__.py
|
||||
│ ├── main.py # Главный файл приложения
|
||||
│ ├── database.py # Конфигурация БД
|
||||
│ ├── models.py # SQLAlchemy модели
|
||||
│ ├── crud.py # CRUD операции
|
||||
│ └── api/
|
||||
│ ├── main.py # Главный файл приложения
|
||||
│ ├── crud.py # CRUD операции
|
||||
│ ├── api/
|
||||
│ │ ├── __init__.py
|
||||
│ │ └── uis.py # API endpoints для UIS
|
||||
│ ├── infrastructure/
|
||||
│ │ ├── __init__.py
|
||||
│ │ └── database.py # Конфигурация БД
|
||||
│ └── models/
|
||||
│ ├── __init__.py
|
||||
│ └── uis.py # API endpoints для UIS
|
||||
├── docker-compose.yaml
|
||||
│ └── call_event.py # SQLAlchemy модели
|
||||
├── alembic/ # Миграции БД
|
||||
│ ├── env.py
|
||||
│ └── versions/
|
||||
├── alembic.ini
|
||||
├── Dockerfile
|
||||
├── requirements.txt
|
||||
└── .env.example
|
||||
└── .env
|
||||
```
|
||||
|
||||
## Быстрый старт
|
||||
|
||||
### 1. Запуск через Docker Compose (рекомендуется)
|
||||
|
||||
> ⚠️ **ВАЖНО**: Необходимо сначала запустить инфраструктуру (PostgreSQL), которая создаст общую сеть `call-review-network`, а затем микросервис.
|
||||
|
||||
**Шаг 1: Запустить инфраструктуру (PostgreSQL)**
|
||||
```bash
|
||||
# Запустить все сервисы (PostgreSQL + приложение)
|
||||
docker-compose up -d
|
||||
# Из корня проекта
|
||||
docker-compose -f infra/docker-compose.yaml up -d
|
||||
|
||||
# Проверить логи
|
||||
docker-compose logs -f app
|
||||
# Проверить, что PostgreSQL запущен и сеть создана
|
||||
docker network ls | grep call-review
|
||||
docker ps --filter "name=ingest-postgres"
|
||||
```
|
||||
|
||||
# Остановить сервисы
|
||||
docker-compose down
|
||||
**Шаг 2: Запустить микросервис ingest-service**
|
||||
```bash
|
||||
# Из корня проекта
|
||||
docker-compose -f services/ingest-service/docker-compose.yaml up -d
|
||||
|
||||
# Проверить логи (увидите применение миграций и старт сервиса)
|
||||
docker-compose -f services/ingest-service/docker-compose.yaml logs -f
|
||||
|
||||
# Проверить статус
|
||||
docker ps --filter "name=ingest"
|
||||
```
|
||||
|
||||
**Проверка работы:**
|
||||
```bash
|
||||
# Health check
|
||||
curl http://localhost:8000/health
|
||||
|
||||
# Swagger UI
|
||||
open http://localhost:8000/docs
|
||||
```
|
||||
|
||||
**Остановка сервисов:**
|
||||
```bash
|
||||
# Остановить микросервис (из корня проекта)
|
||||
docker-compose -f services/ingest-service/docker-compose.yaml down
|
||||
|
||||
# Остановить инфраструктуру (ВНИМАНИЕ: остановит PostgreSQL!)
|
||||
docker-compose -f infra/docker-compose.yaml down
|
||||
|
||||
# Или остановить всё сразу
|
||||
docker-compose -f services/ingest-service/docker-compose.yaml down && \
|
||||
docker-compose -f infra/docker-compose.yaml down
|
||||
```
|
||||
|
||||
Приложение будет доступно по адресу: http://localhost:8000
|
||||
@@ -51,6 +93,13 @@ Swagger документация: http://localhost:8000/docs
|
||||
### 2. Локальный запуск для разработки
|
||||
|
||||
```bash
|
||||
# Запустить только PostgreSQL через Docker
|
||||
cd ../../infra
|
||||
docker-compose up -d postgres
|
||||
|
||||
# Вернуться в папку сервиса
|
||||
cd ../services/ingest-service
|
||||
|
||||
# Создать виртуальное окружение
|
||||
python -m venv venv
|
||||
source venv/bin/activate # для Linux/Mac
|
||||
@@ -60,11 +109,11 @@ venv\Scripts\activate # для Windows
|
||||
# Установить зависимости
|
||||
pip install -r requirements.txt
|
||||
|
||||
# Запустить только PostgreSQL через Docker
|
||||
docker-compose up -d postgres
|
||||
# Убедиться, что файл .env настроен правильно
|
||||
# DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
|
||||
|
||||
# Создать файл .env
|
||||
cp .env.example .env
|
||||
# Применить миграции
|
||||
alembic upgrade head
|
||||
|
||||
# Запустить приложение
|
||||
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
|
||||
@@ -79,7 +128,7 @@ Webhook для приема событий звонков от UIS
|
||||
```json
|
||||
{
|
||||
"eventType": "call_completed",
|
||||
"call_session_id": "12345-abcde",
|
||||
"call_session_id": 12345,
|
||||
"direction": "in",
|
||||
"employee_id": 100,
|
||||
"employee_full_name": "Иванов Иван Иванович",
|
||||
@@ -94,45 +143,49 @@ Webhook для приема событий звонков от UIS
|
||||
}
|
||||
```
|
||||
|
||||
### GET /v1/uis/events
|
||||
### GET /v1/uis/events (TODO)
|
||||
Получить список всех событий звонков
|
||||
|
||||
**Query параметры:**
|
||||
- `skip` - количество пропускаемых записей (пагинация)
|
||||
- `limit` - максимальное количество записей (по умолчанию 100)
|
||||
|
||||
### GET /v1/uis/events/{call_session_id}
|
||||
_Примечание: Endpoint запланирован к реализации_
|
||||
|
||||
### GET /v1/uis/events/{call_session_id} (TODO)
|
||||
Получить конкретное событие звонка по session_id
|
||||
|
||||
### GET /v1/uis/events/employee/{employee_id}
|
||||
_Примечание: Endpoint запланирован к реализации_
|
||||
|
||||
### GET /v1/uis/events/employee/{employee_id} (TODO)
|
||||
Получить все звонки конкретного сотрудника
|
||||
|
||||
**Query параметры:**
|
||||
- `skip` - количество пропускаемых записей
|
||||
- `limit` - максимальное количество записей
|
||||
|
||||
_Примечание: Endpoint запланирован к реализации_
|
||||
|
||||
## База данных
|
||||
|
||||
### Модель данных CallEvent
|
||||
|
||||
Таблица `call_events` содержит следующие поля:
|
||||
|
||||
- `id` - уникальный идентификатор (автоинкремент)
|
||||
- `event_type` - тип события
|
||||
- `call_session_id` - уникальный ID сессии звонка
|
||||
- `id` - уникальный идентификатор (UUID)
|
||||
- `call_session_id` - уникальный ID сессии звонка (BigInteger, индексируется)
|
||||
- `direction` - направление звонка (in/out)
|
||||
- `notification_mnemonic` - мнемоника уведомления (тип события)
|
||||
- `last_answered_employee_full_name` - ФИО сотрудника, ответившего на звонок
|
||||
- `employee_id` - ID сотрудника
|
||||
- `employee_full_name` - ФИО сотрудника
|
||||
- `contact_phone_number` - телефон контакта
|
||||
- `called_phone_number` - набранный телефон
|
||||
- `communication_group_name` - название группы коммуникации
|
||||
- `start_time` - время начала звонка
|
||||
- `finish_time` - время окончания звонка
|
||||
- `finish_time` - время окончания звонка (Unix timestamp)
|
||||
- `total_time_duration` - общая длительность звонка (в секундах)
|
||||
- `wait_time_duration` - длительность ожидания (в секундах)
|
||||
- `total_wait_time_duration` - общая длительность ожидания (в секундах)
|
||||
- `talk_time_duration` - длительность разговора (в секундах)
|
||||
- `clean_talk_time_duration` - чистая длительность разговора (в секундах)
|
||||
- `full_record_file_link` - ссылка на запись звонка
|
||||
- `campaign_name` - название кампании
|
||||
- `created_at` - дата создания записи
|
||||
- `updated_at` - дата последнего обновления
|
||||
- `tcm_topcrm_notification_name` - название уведомления TCM/TopCRM (название кампании)
|
||||
|
||||
### Подключение к PostgreSQL
|
||||
|
||||
|
||||
27
services/ingest-service/alembic/script.py.mako
Normal file
27
services/ingest-service/alembic/script.py.mako
Normal file
@@ -0,0 +1,27 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = ${repr(up_revision)}
|
||||
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
|
||||
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
|
||||
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
${downgrades if downgrades else "pass"}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""Initial migration with call_events table
|
||||
"""INIT
|
||||
|
||||
Revision ID: a7e5c5ef6bc1
|
||||
Revision ID: 49dc6736230f
|
||||
Revises:
|
||||
Create Date: 2025-11-19 22:43:33.739763
|
||||
Create Date: 2025-12-06 13:21:32.669444
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
@@ -12,14 +12,13 @@ import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'a7e5c5ef6bc1'
|
||||
revision: str = '49dc6736230f'
|
||||
down_revision: Union[str, Sequence[str], None] = None
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
"""Upgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table('call_events',
|
||||
sa.Column('id', sa.UUID(), nullable=False),
|
||||
@@ -36,17 +35,20 @@ def upgrade() -> None:
|
||||
sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False),
|
||||
sa.Column('full_record_file_link', sa.String(), nullable=False),
|
||||
sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False),
|
||||
sa.Column('s3_key', sa.String(), nullable=True),
|
||||
sa.Column('processing_status', sa.String(), nullable=False),
|
||||
sa.Column('retries', sa.Integer(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False)
|
||||
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
|
||||
op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
"""Downgrade schema."""
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_index(op.f('ix_call_events_id'), table_name='call_events')
|
||||
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
|
||||
op.drop_table('call_events')
|
||||
# ### end Alembic commands ###
|
||||
|
||||
@@ -1,39 +1,79 @@
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import List
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
from pydantic import BaseModel, HttpUrl
|
||||
from pydantic import BaseModel, HttpUrl, Field, field_validator
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.infrastructure.database import get_db
|
||||
from app import crud
|
||||
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CallDirection(str, Enum):
|
||||
"""Направление звонка"""
|
||||
in_ = "in"
|
||||
out = "out"
|
||||
|
||||
|
||||
class UisCallEvent(BaseModel):
|
||||
eventType: str
|
||||
call_session_id: int
|
||||
direction: CallDirection
|
||||
employee_id: int
|
||||
employee_full_name: str
|
||||
contact_phone_number: str
|
||||
called_phone_number: str
|
||||
communication_group_name: str
|
||||
start_time: datetime
|
||||
finish_time: datetime
|
||||
talk_time_duration: int
|
||||
full_record_file_link: HttpUrl
|
||||
campaign_name: str
|
||||
"""Схема события звонка от UIS"""
|
||||
eventType: str = Field(..., description="Тип события")
|
||||
call_session_id: int = Field(..., gt=0, description="Уникальный ID сессии звонка")
|
||||
direction: CallDirection = Field(..., description="Направление звонка")
|
||||
employee_id: int = Field(..., gt=0, description="ID сотрудника")
|
||||
employee_full_name: str = Field(..., min_length=1, description="ФИО сотрудника")
|
||||
contact_phone_number: str = Field(..., description="Телефон контакта")
|
||||
called_phone_number: str = Field(..., description="Набранный телефон")
|
||||
communication_group_name: str = Field(..., description="Группа коммуникации")
|
||||
start_time: datetime = Field(..., description="Время начала звонка")
|
||||
finish_time: datetime = Field(..., description="Время окончания звонка")
|
||||
talk_time_duration: int = Field(..., ge=0, description="Длительность разговора (сек)")
|
||||
full_record_file_link: HttpUrl = Field(..., description="Ссылка на запись")
|
||||
campaign_name: str = Field(..., description="Название кампании")
|
||||
|
||||
@field_validator('finish_time')
|
||||
@classmethod
|
||||
def validate_finish_time(cls, v: datetime, info) -> datetime:
|
||||
"""Проверяем, что finish_time >= start_time"""
|
||||
if 'start_time' in info.data and v < info.data['start_time']:
|
||||
raise ValueError('finish_time must be greater than or equal to start_time')
|
||||
return v
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.post("/webhook", status_code=204)
|
||||
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None:
|
||||
"""Webhook для получения событий звонков от UIS"""
|
||||
async def create_call_event(
|
||||
callEvent: UisCallEvent,
|
||||
db: AsyncSession = Depends(get_db)
|
||||
) -> None:
|
||||
"""
|
||||
Webhook для получения событий звонков от UIS
|
||||
|
||||
**Обрабатывает события:**
|
||||
- Сохраняет событие звонка в базу данных
|
||||
- Проверяет уникальность call_session_id
|
||||
- Логирует успешную обработку
|
||||
|
||||
**Возможные ошибки:**
|
||||
- 409 Conflict: Событие с таким call_session_id уже существует
|
||||
- 422 Unprocessable Entity: Ошибка валидации данных
|
||||
- 500 Internal Server Error: Ошибка сервера/БД
|
||||
"""
|
||||
logger.info(
|
||||
f"Received webhook event",
|
||||
extra={
|
||||
"event_type": callEvent.eventType,
|
||||
"call_session_id": callEvent.call_session_id,
|
||||
"employee_id": callEvent.employee_id
|
||||
}
|
||||
)
|
||||
|
||||
# Преобразуем UisCallEvent в данные для CallEvent (БД)
|
||||
event_data = {
|
||||
@@ -53,5 +93,5 @@ async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(
|
||||
"clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение
|
||||
}
|
||||
|
||||
# Сохраняем в БД
|
||||
# Сохраняем в БД (исключения обрабатываются в exception handlers)
|
||||
await crud.create_call_event(db, event_data)
|
||||
18
services/ingest-service/app/config.py
Normal file
18
services/ingest-service/app/config.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import os
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
class Settings(BaseSettings):
|
||||
DATABASE_URL: str
|
||||
S3_ENDPOINT_URL: str
|
||||
AWS_ACCESS_KEY_ID: str
|
||||
AWS_SECRET_ACCESS_KEY: str
|
||||
AWS_REGION: str
|
||||
S3_BUCKET_NAME: str
|
||||
MAX_RETRIES: int
|
||||
RETRY_DELAY: int
|
||||
|
||||
# class Config:
|
||||
# env_file = ".env"
|
||||
# env_file_encoding = "utf-8"
|
||||
|
||||
settings = Settings()
|
||||
0
services/ingest-service/app/core/__init__.py
Normal file
0
services/ingest-service/app/core/__init__.py
Normal file
29
services/ingest-service/app/core/exceptions.py
Normal file
29
services/ingest-service/app/core/exceptions.py
Normal file
@@ -0,0 +1,29 @@
|
||||
"""Кастомные исключения для приложения"""
|
||||
|
||||
|
||||
class AppException(Exception):
|
||||
"""Базовое исключение приложения"""
|
||||
def __init__(self, message: str, details: dict = None):
|
||||
self.message = message
|
||||
self.details = details or {}
|
||||
super().__init__(self.message)
|
||||
|
||||
|
||||
class CallEventAlreadyExistsError(AppException):
|
||||
"""Событие звонка с таким call_session_id уже существует"""
|
||||
def __init__(self, call_session_id: int):
|
||||
super().__init__(
|
||||
message=f"Call event with session_id {call_session_id} already exists",
|
||||
details={"call_session_id": call_session_id}
|
||||
)
|
||||
|
||||
|
||||
class DatabaseError(AppException):
|
||||
"""Ошибка работы с базой данных"""
|
||||
pass
|
||||
|
||||
|
||||
class ValidationError(AppException):
|
||||
"""Ошибка валидации данных"""
|
||||
pass
|
||||
|
||||
@@ -1,11 +1,61 @@
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from asyncpg.exceptions import UniqueViolationError
|
||||
import logging
|
||||
|
||||
from app.models.call_event import CallEvent
|
||||
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent:
|
||||
"""Создать новое событие звонка в БД"""
|
||||
call_event = CallEvent(**event_data)
|
||||
db.add(call_event)
|
||||
await db.commit()
|
||||
await db.refresh(call_event)
|
||||
return call_event
|
||||
"""Создать новое событие звонка в БД
|
||||
|
||||
Args:
|
||||
db: Async сессия базы данных
|
||||
event_data: Данные события звонка
|
||||
|
||||
Returns:
|
||||
CallEvent: Созданное событие
|
||||
|
||||
Raises:
|
||||
CallEventAlreadyExistsError: Если событие с таким call_session_id уже существует
|
||||
DatabaseError: При других ошибках БД
|
||||
"""
|
||||
try:
|
||||
call_event = CallEvent(**event_data)
|
||||
db.add(call_event)
|
||||
await db.commit()
|
||||
await db.refresh(call_event)
|
||||
|
||||
logger.info(
|
||||
f"Call event created successfully",
|
||||
extra={
|
||||
"call_session_id": call_event.call_session_id,
|
||||
"employee_id": call_event.employee_id
|
||||
}
|
||||
)
|
||||
|
||||
return call_event
|
||||
|
||||
except IntegrityError as e:
|
||||
await db.rollback()
|
||||
|
||||
# Проверяем, является ли это ошибкой уникальности call_session_id
|
||||
if isinstance(e.orig, UniqueViolationError):
|
||||
call_session_id = event_data.get("call_session_id")
|
||||
logger.warning(
|
||||
f"Duplicate call event detected",
|
||||
extra={"call_session_id": call_session_id}
|
||||
)
|
||||
raise CallEventAlreadyExistsError(call_session_id)
|
||||
|
||||
# Другие ошибки целостности
|
||||
logger.error(f"Database integrity error: {str(e)}")
|
||||
raise DatabaseError(f"Failed to create call event: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
await db.rollback()
|
||||
logger.error(f"Unexpected error creating call event: {str(e)}", exc_info=True)
|
||||
raise DatabaseError(f"Unexpected database error: {str(e)}")
|
||||
|
||||
@@ -2,13 +2,28 @@ import os
|
||||
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db")
|
||||
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "yes")
|
||||
|
||||
# Единый async движок для всего приложения
|
||||
engine = create_async_engine(DATABASE_URL, echo=True)
|
||||
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
# echo=True включается только в режиме DEBUG
|
||||
engine = create_async_engine(
|
||||
DATABASE_URL,
|
||||
echo=DEBUG,
|
||||
pool_size=10,
|
||||
max_overflow=20,
|
||||
pool_pre_ping=True,
|
||||
pool_recycle=3600,
|
||||
)
|
||||
|
||||
AsyncSessionLocal = async_sessionmaker(
|
||||
engine,
|
||||
class_=AsyncSession,
|
||||
expire_on_commit=False
|
||||
)
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
67
services/ingest-service/app/infrastructure/s3.py
Normal file
67
services/ingest-service/app/infrastructure/s3.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import aioboto3
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def download_and_upload_stream(url: str, key_name: str) -> bool:
|
||||
"""
|
||||
Скачивает аудиофайл по URL потоками и передаёт в upload_file_to_s3 для загрузки.
|
||||
"""
|
||||
try:
|
||||
logger.info(f"[INFO] Начало скачивания {url}")
|
||||
async with aiohttp.ClientSession() as http_session:
|
||||
async with http_session.get(url) as resp:
|
||||
resp.raise_for_status()
|
||||
|
||||
# читаем файл по частям и собираем в bytes
|
||||
chunks = []
|
||||
while True:
|
||||
chunk = await resp.content.read(1024 * 1024) # 1 МБ
|
||||
if not chunk:
|
||||
break
|
||||
chunks.append(chunk)
|
||||
file_bytes = b"".join(chunks)
|
||||
|
||||
logger.info(f"[INFO] Скачивание {url} завершено, размер {len(file_bytes)} байт")
|
||||
|
||||
# передаём в функцию загрузки с retry
|
||||
success = await upload_file_to_s3(file_bytes, key_name)
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[FAILED] Не удалось скачать или загрузить {url}: {e}")
|
||||
return False
|
||||
|
||||
async def upload_file_to_s3(file_bytes: bytes, key_name: str) -> bool:
|
||||
"""
|
||||
Асинхронно загружает файл в S3 с ретраями.
|
||||
Возвращает True при успехе, False при ошибке.
|
||||
"""
|
||||
session = aioboto3.Session()
|
||||
for attempt in range(1, settings.MAX_RETRIES + 1):
|
||||
try:
|
||||
logger.info(f"Попытка {attempt} загрузки {key_name} в S3")
|
||||
#logger.info(f"Параметры S3: {S3_ENDPOINT_URL}, {S3_BUCKET_NAME}, {AWS_REGION}")
|
||||
|
||||
async with session.client(
|
||||
"s3",
|
||||
endpoint_url=settings.S3_ENDPOINT_URL,
|
||||
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
|
||||
region_name=settings.AWS_REGION
|
||||
) as s3:
|
||||
await s3.put_object(Bucket=settings.S3_BUCKET_NAME, Key=key_name, Body=file_bytes)
|
||||
logger.info(f"[OK] Файл {key_name} загружен в S3!")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception(f"[ERROR] Попытка {attempt} для {key_name} не удалась: {e}")
|
||||
if attempt < settings.MAX_RETRIES:
|
||||
await asyncio.sleep(settings.RETRY_DELAY)
|
||||
else:
|
||||
logger.error(f"[FAILED] Файл {key_name} не удалось загрузить после {settings.MAX_RETRIES} попыток")
|
||||
return False
|
||||
60
services/ingest-service/app/infrastructure/scheduler.py
Normal file
60
services/ingest-service/app/infrastructure/scheduler.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
||||
from app.models.call_event import CallEvent
|
||||
from app.infrastructure.s3 import download_and_upload_stream
|
||||
from app.infrastructure.database import get_db
|
||||
from app.config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BATCH_SIZE = 20
|
||||
MAX_RETRIES_CALL = 3
|
||||
async def fetch_pending_calls(session: AsyncSession):
|
||||
stmt = (
|
||||
select(CallEvent)
|
||||
.where(CallEvent.processing_status == "pending")
|
||||
.where(CallEvent.retries < MAX_RETRIES_CALL)
|
||||
.order_by(CallEvent.id)
|
||||
.limit(BATCH_SIZE)
|
||||
.with_for_update(skip_locked=True)
|
||||
)
|
||||
result = await session.execute(stmt)
|
||||
calls = result.scalars().all()
|
||||
return calls
|
||||
|
||||
async def process_pending_calls():
|
||||
logger.info("[INFO] Запуск обработки новых записей")
|
||||
|
||||
async for session in get_db():
|
||||
calls = await fetch_pending_calls(session)
|
||||
if not calls:
|
||||
logger.info("[INFO] Нет новых записей для обработки")
|
||||
return
|
||||
|
||||
# сразу увеличиваем retries и помечаем processing
|
||||
for call in calls:
|
||||
call.retries += 1
|
||||
call.processing_status = "processing"
|
||||
await session.commit()
|
||||
|
||||
# создаём задачи на скачивание/загрузку
|
||||
tasks = []
|
||||
for call in calls:
|
||||
key_name = f"audio/{call.id}.mp3"
|
||||
logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link)
|
||||
tasks.append(download_and_upload_stream(call.full_record_file_link, key_name))
|
||||
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# обновляем статус после обработки
|
||||
for call, result in zip(calls, results):
|
||||
if isinstance(result, Exception) or result is False:
|
||||
call.processing_status = "failed"
|
||||
else:
|
||||
call.processing_status = "done"
|
||||
call.s3_key = f"audio/{call.id}.mp3"
|
||||
|
||||
await session.commit()
|
||||
logger.info(f"Обработка пачки из {len(calls)} записей завершена")
|
||||
@@ -1,11 +1,102 @@
|
||||
from contextlib import asynccontextmanager
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, Request, status
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.concurrency import asynccontextmanager
|
||||
import logging
|
||||
from app.infrastructure.scheduler import process_pending_calls
|
||||
|
||||
from app.api.uis import router as uis_router
|
||||
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
from fastapi import FastAPI
|
||||
import asyncio
|
||||
from app.infrastructure.database import get_db
|
||||
|
||||
# Настройка логирования
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app:FastAPI):
|
||||
scheduler = AsyncIOScheduler()
|
||||
|
||||
scheduler.add_job(process_pending_calls, "interval", seconds = 10)
|
||||
|
||||
scheduler.start()
|
||||
yield
|
||||
await scheduler.shutdown(wait=True)
|
||||
|
||||
app = FastAPI(
|
||||
title="Ingest Service API",
|
||||
description="Микросервис для приема событий звонков",
|
||||
version="1.0.0",
|
||||
lifespan=lifespan
|
||||
)
|
||||
|
||||
# Exception handlers
|
||||
@app.exception_handler(CallEventAlreadyExistsError)
|
||||
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
|
||||
"""Обработчик для дубликатов событий звонков"""
|
||||
logger.warning(f"Duplicate call event: {exc.message}", extra=exc.details)
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_409_CONFLICT,
|
||||
content={
|
||||
"error": "duplicate_call_event",
|
||||
"message": exc.message,
|
||||
"details": exc.details
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(DatabaseError)
|
||||
async def database_error_handler(request: Request, exc: DatabaseError):
|
||||
"""Обработчик для ошибок БД"""
|
||||
logger.error(f"Database error: {exc.message}", extra=exc.details)
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={
|
||||
"error": "database_error",
|
||||
"message": "Internal database error occurred",
|
||||
"details": exc.details
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(ValidationError)
|
||||
async def validation_error_handler(request: Request, exc: ValidationError):
|
||||
"""Обработчик для ошибок валидации"""
|
||||
logger.warning(f"Validation error: {exc.message}", extra=exc.details)
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
content={
|
||||
"error": "validation_error",
|
||||
"message": exc.message,
|
||||
"details": exc.details
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(Exception)
|
||||
async def general_exception_handler(request: Request, exc: Exception):
|
||||
"""Общий обработчик для неперехваченных исключений"""
|
||||
logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
|
||||
return JSONResponse(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
content={
|
||||
"error": "internal_server_error",
|
||||
"message": "An unexpected error occurred"
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# Health check endpoint
|
||||
@app.get("/health", tags=["Health"])
|
||||
async def health_check():
|
||||
"""Проверка состояния сервиса"""
|
||||
return {"status": "healthy", "service": "ingest-service"}
|
||||
|
||||
|
||||
app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"])
|
||||
@@ -1,35 +0,0 @@
|
||||
from datetime import datetime
|
||||
from sqlalchemy import Column, Integer, String, DateTime, Enum as SQLEnum
|
||||
from app.infrastructure import Base
|
||||
import enum
|
||||
|
||||
|
||||
class CallDirectionEnum(str, enum.Enum):
|
||||
in_ = "in"
|
||||
out = "out"
|
||||
|
||||
|
||||
class CallEvent(Base):
|
||||
"""Модель для хранения событий звонков"""
|
||||
__tablename__ = "call_events"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
event_type = Column(String, nullable=False)
|
||||
call_session_id = Column(String, unique=True, index=True, nullable=False)
|
||||
direction = Column(SQLEnum(CallDirectionEnum), nullable=False)
|
||||
employee_id = Column(Integer, nullable=False, index=True)
|
||||
employee_full_name = Column(String, nullable=False)
|
||||
contact_phone_number = Column(String, nullable=False)
|
||||
called_phone_number = Column(String, nullable=False)
|
||||
communication_group_name = Column(String)
|
||||
start_time = Column(DateTime, nullable=False)
|
||||
finish_time = Column(DateTime, nullable=False)
|
||||
talk_time_duration = Column(Integer, nullable=False)
|
||||
full_record_file_link = Column(String, nullable=False)
|
||||
campaign_name = Column(String)
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<CallEvent(id={self.id}, call_session_id={self.call_session_id})>"
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
from sqlalchemy import Column, BigInteger, Integer, String, DateTime
|
||||
from sqlalchemy import Column, BigInteger, Integer, String, DateTime, Enum as SQLEnum
|
||||
from enum import Enum
|
||||
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
|
||||
import uuid
|
||||
from app.infrastructure.database import Base
|
||||
from enum import Enum as PyEnum
|
||||
|
||||
class CallEvent(Base):
|
||||
__tablename__ = "call_events"
|
||||
|
||||
id = Column(PG_UUID(as_uuid=True), primary_key=True, default=uuid.uuid4, index=True)
|
||||
call_session_id = Column(BigInteger, nullable=False, index=True)
|
||||
call_session_id = Column(BigInteger, nullable=False, unique=True, index=True)
|
||||
direction = Column(String, nullable=False)
|
||||
notification_mnemonic = Column(String, nullable=False)
|
||||
last_answered_employee_full_name = Column(String, nullable=True)
|
||||
@@ -20,3 +22,7 @@ class CallEvent(Base):
|
||||
clean_talk_time_duration = Column(Integer, nullable=False)
|
||||
full_record_file_link = Column(String, nullable=False)
|
||||
tcm_topcrm_notification_name = Column(String, nullable=False)
|
||||
|
||||
s3_key = Column(String, nullable=True)
|
||||
processing_status = Column(String, default="pending", nullable=False)
|
||||
retries = Column(Integer, default=0, nullable=False)
|
||||
35
services/ingest-service/docker-compose.yaml
Normal file
35
services/ingest-service/docker-compose.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
version: "3.8"
|
||||
|
||||
services:
|
||||
ingest-service:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
container_name: ingest-service
|
||||
env_file:
|
||||
- .env
|
||||
ports:
|
||||
- "8000:8000"
|
||||
networks:
|
||||
- call-review-network
|
||||
restart: unless-stopped
|
||||
# Запускаем миграции перед стартом приложения
|
||||
command: >
|
||||
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
|
||||
healthcheck:
|
||||
test:
|
||||
[
|
||||
"CMD",
|
||||
"python",
|
||||
"-c",
|
||||
"import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()",
|
||||
]
|
||||
interval: 30s
|
||||
timeout: 3s
|
||||
start_period: 10s
|
||||
retries: 3
|
||||
|
||||
networks:
|
||||
call-review-network:
|
||||
external: true
|
||||
name: call-review-network
|
||||
@@ -5,4 +5,6 @@ sqlalchemy==2.0.23
|
||||
asyncpg==0.29.0
|
||||
alembic==1.13.1
|
||||
python-dotenv==1.0.0
|
||||
|
||||
aioboto3==15.5.0
|
||||
apscheduler==3.11.1
|
||||
pydantic_settings==2.11.0
|
||||
Reference in New Issue
Block a user