Initial commit

This commit is contained in:
2025-11-19 22:54:33 +07:00
commit 8f5b984598
18 changed files with 761 additions and 0 deletions

View File

@@ -0,0 +1,7 @@
# Database
DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
# Application
APP_HOST=0.0.0.0
APP_PORT=8000

View File

@@ -0,0 +1,11 @@
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -0,0 +1,226 @@
# Ingest Service
Микросервис для приема и хранения событий звонков от UIS.
## Технологии
- Python 3.9
- FastAPI
- PostgreSQL
- SQLAlchemy (async)
- Docker & Docker Compose
## Структура проекта
```
ingest-service/
├── app/
│ ├── __init__.py
│ ├── main.py # Главный файл приложения
│ ├── database.py # Конфигурация БД
│ ├── models.py # SQLAlchemy модели
│ ├── crud.py # CRUD операции
│ └── api/
│ ├── __init__.py
│ └── uis.py # API endpoints для UIS
├── docker-compose.yaml
├── Dockerfile
├── requirements.txt
└── .env.example
```
## Быстрый старт
### 1. Запуск через Docker Compose (рекомендуется)
```bash
# Запустить все сервисы (PostgreSQL + приложение)
docker-compose up -d
# Проверить логи
docker-compose logs -f app
# Остановить сервисы
docker-compose down
```
Приложение будет доступно по адресу: http://localhost:8000
Swagger документация: http://localhost:8000/docs
### 2. Локальный запуск для разработки
```bash
# Создать виртуальное окружение
python -m venv venv
source venv/bin/activate # для Linux/Mac
# или
venv\Scripts\activate # для Windows
# Установить зависимости
pip install -r requirements.txt
# Запустить только PostgreSQL через Docker
docker-compose up -d postgres
# Создать файл .env
cp .env.example .env
# Запустить приложение
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
```
## API Endpoints
### POST /v1/uis/webhook
Webhook для приема событий звонков от UIS
**Пример запроса:**
```json
{
"eventType": "call_completed",
"call_session_id": "12345-abcde",
"direction": "in",
"employee_id": 100,
"employee_full_name": "Иванов Иван Иванович",
"contact_phone_number": "+79001234567",
"called_phone_number": "+78001234567",
"communication_group_name": "Продажи",
"start_time": "2024-01-15T10:30:00",
"finish_time": "2024-01-15T10:35:00",
"talk_time_duration": 300,
"full_record_file_link": "https://example.com/records/12345.mp3",
"campaign_name": "Зимняя кампания"
}
```
### GET /v1/uis/events
Получить список всех событий звонков
**Query параметры:**
- `skip` - количество пропускаемых записей (пагинация)
- `limit` - максимальное количество записей (по умолчанию 100)
### GET /v1/uis/events/{call_session_id}
Получить конкретное событие звонка по session_id
### GET /v1/uis/events/employee/{employee_id}
Получить все звонки конкретного сотрудника
**Query параметры:**
- `skip` - количество пропускаемых записей
- `limit` - максимальное количество записей
## База данных
### Модель данных CallEvent
Таблица `call_events` содержит следующие поля:
- `id` - уникальный идентификатор (автоинкремент)
- `event_type` - тип события
- `call_session_id` - уникальный ID сессии звонка
- `direction` - направление звонка (in/out)
- `employee_id` - ID сотрудника
- `employee_full_name` - ФИО сотрудника
- `contact_phone_number` - телефон контакта
- `called_phone_number` - набранный телефон
- `communication_group_name` - название группы коммуникации
- `start_time` - время начала звонка
- `finish_time` - время окончания звонка
- `talk_time_duration` - длительность разговора (в секундах)
- `full_record_file_link` - ссылка на запись звонка
- `campaign_name` - название кампании
- `created_at` - дата создания записи
- `updated_at` - дата последнего обновления
### Подключение к PostgreSQL
Для подключения к базе данных используйте:
```bash
# Через Docker
docker exec -it ingest-postgres psql -U postgres -d ingest_db
# Или напрямую (если PostgreSQL запущен локально)
psql -h localhost -p 5432 -U postgres -d ingest_db
```
Пароль: `postgres`
### Полезные SQL команды
```sql
-- Посмотреть все таблицы
\dt
-- Посмотреть структуру таблицы
\d call_events
-- Посмотреть все записи
SELECT * FROM call_events;
-- Количество звонков по сотрудникам
SELECT employee_full_name, COUNT(*) as call_count
FROM call_events
GROUP BY employee_full_name;
```
## Разработка
### Миграции (Alembic)
Для работы с миграциями базы данных:
```bash
# Инициализация Alembic (уже выполнено)
alembic init alembic
# Создать новую миграцию
alembic revision --autogenerate -m "описание изменений"
# Применить миграции
alembic upgrade head
# Откатить последнюю миграцию
alembic downgrade -1
```
### Тестирование
```bash
# Установить зависимости для тестирования
pip install pytest pytest-asyncio httpx
# Запустить тесты (когда будут созданы)
pytest
```
## Переменные окружения
- `DATABASE_URL` - строка подключения к PostgreSQL
- `APP_HOST` - хост для запуска приложения
- `APP_PORT` - порт для запуска приложения
## Troubleshooting
### Ошибка подключения к БД
Если не удается подключиться к базе данных:
1. Проверьте, что PostgreSQL запущен: `docker-compose ps`
2. Проверьте логи: `docker-compose logs postgres`
3. Проверьте правильность DATABASE_URL в переменных окружения
### Ошибка при создании таблиц
Если таблицы не создаются автоматически:
1. Подключитесь к БД
2. Удалите все таблицы: `DROP TABLE IF EXISTS call_events CASCADE;`
3. Перезапустите приложение
## Лицензия
MIT

View File

@@ -0,0 +1,148 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
# URL будет переопределен в env.py из переменных окружения
sqlalchemy.url = postgresql://postgres:postgres@localhost:5432/ingest_db
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@@ -0,0 +1,76 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
import os
from dotenv import load_dotenv
load_dotenv()
from app.core.database import Base
# Важно! Импортируем все модели, чтобы они зарегистрировались в Base.metadata
from app.models import CallEvent
config = context.config
# Переопределяем URL из переменных окружения
database_url = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/ingest_db")
# Преобразуем asyncpg URL в psycopg2
sync_database_url = database_url.replace("postgresql+asyncpg://", "postgresql://")
config.set_main_option("sqlalchemy.url", sync_database_url)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@@ -0,0 +1,52 @@
"""Initial migration with call_events table
Revision ID: a7e5c5ef6bc1
Revises:
Create Date: 2025-11-19 22:43:33.739763
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'a7e5c5ef6bc1'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('call_events',
sa.Column('id', sa.UUID(), nullable=False),
sa.Column('call_session_id', sa.BigInteger(), nullable=False),
sa.Column('direction', sa.String(), nullable=False),
sa.Column('notification_mnemonic', sa.String(), nullable=False),
sa.Column('last_answered_employee_full_name', sa.String(), nullable=True),
sa.Column('employee_id', sa.Integer(), nullable=True),
sa.Column('finish_time', sa.Integer(), nullable=False),
sa.Column('total_time_duration', sa.Integer(), nullable=False),
sa.Column('wait_time_duration', sa.Integer(), nullable=False),
sa.Column('total_wait_time_duration', sa.Integer(), nullable=False),
sa.Column('talk_time_duration', sa.Integer(), nullable=False),
sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False),
sa.Column('full_record_file_link', sa.String(), nullable=False),
sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False)
op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_id'), table_name='call_events')
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.drop_table('call_events')
# ### end Alembic commands ###

View File

View File

@@ -0,0 +1,118 @@
from datetime import datetime
from enum import Enum
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, HttpUrl
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app import crud
class CallDirection(str, Enum):
in_ = "in"
out = "out"
class UisCallEvent(BaseModel):
eventType: str
call_session_id: str
direction: CallDirection
employee_id: int
employee_full_name: str
contact_phone_number: str
called_phone_number: str
communication_group_name: str
start_time: datetime
finish_time: datetime
talk_time_duration: int
full_record_file_link: HttpUrl
campaign_name: str
class Config:
from_attributes = True
class CallEventResponse(BaseModel):
id: int
event_type: str
call_session_id: str
direction: str
employee_id: int
employee_full_name: str
contact_phone_number: str
called_phone_number: str
communication_group_name: str
start_time: datetime
finish_time: datetime
talk_time_duration: int
full_record_file_link: str
campaign_name: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
router = APIRouter()
@router.post("/webhook", response_model=CallEventResponse, status_code=201)
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)):
"""Webhook для получения событий звонков от UIS"""
# Проверяем, не существует ли уже событие с таким call_session_id
existing_event = await crud.get_call_event_by_session_id(db, callEvent.call_session_id)
if existing_event:
raise HTTPException(status_code=400, detail="Call event with this session_id already exists")
# Преобразуем Pydantic модель в словарь для БД
call_event_data = {
"event_type": callEvent.eventType,
"call_session_id": callEvent.call_session_id,
"direction": callEvent.direction,
"employee_id": callEvent.employee_id,
"employee_full_name": callEvent.employee_full_name,
"contact_phone_number": callEvent.contact_phone_number,
"called_phone_number": callEvent.called_phone_number,
"communication_group_name": callEvent.communication_group_name,
"start_time": callEvent.start_time,
"finish_time": callEvent.finish_time,
"talk_time_duration": callEvent.talk_time_duration,
"full_record_file_link": str(callEvent.full_record_file_link),
"campaign_name": callEvent.campaign_name,
}
db_call_event = await crud.create_call_event(db, call_event_data)
return db_call_event
@router.get("/events", response_model=List[CallEventResponse])
async def get_call_events(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Получить список всех событий звонков"""
events = await crud.get_all_call_events(db, skip=skip, limit=limit)
return events
@router.get("/events/{call_session_id}", response_model=CallEventResponse)
async def get_call_event(call_session_id: str, db: AsyncSession = Depends(get_db)):
"""Получить событие звонка по session_id"""
event = await crud.get_call_event_by_session_id(db, call_session_id)
if not event:
raise HTTPException(status_code=404, detail="Call event not found")
return event
@router.get("/events/employee/{employee_id}", response_model=List[CallEventResponse])
async def get_employee_call_events(
employee_id: int,
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
"""Получить все звонки конкретного сотрудника"""
events = await crud.get_call_events_by_employee(db, employee_id, skip=skip, limit=limit)
return events

View File

@@ -0,0 +1,22 @@
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from dotenv import load_dotenv
load_dotenv()
# Для Alembic используем синхронный движок с psycopg2
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db")
# Преобразуем asyncpg URL в psycopg2 для синхронного движка
SYNC_DATABASE_URL = DATABASE_URL.replace("postgresql+asyncpg://", "postgresql://")
engine = create_engine(SYNC_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

View File

@@ -0,0 +1,11 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.api.uis import router as uis_router
app = FastAPI(
title="Ingest Service API",
description="Микросервис для приема событий звонков",
version="1.0.0",
)
app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"])

View File

@@ -0,0 +1,35 @@
from datetime import datetime
from sqlalchemy import Column, Integer, String, DateTime, Enum as SQLEnum
from app.infrastructure import Base
import enum
class CallDirectionEnum(str, enum.Enum):
in_ = "in"
out = "out"
class CallEvent(Base):
"""Модель для хранения событий звонков"""
__tablename__ = "call_events"
id = Column(Integer, primary_key=True, index=True)
event_type = Column(String, nullable=False)
call_session_id = Column(String, unique=True, index=True, nullable=False)
direction = Column(SQLEnum(CallDirectionEnum), nullable=False)
employee_id = Column(Integer, nullable=False, index=True)
employee_full_name = Column(String, nullable=False)
contact_phone_number = Column(String, nullable=False)
called_phone_number = Column(String, nullable=False)
communication_group_name = Column(String)
start_time = Column(DateTime, nullable=False)
finish_time = Column(DateTime, nullable=False)
talk_time_duration = Column(Integer, nullable=False)
full_record_file_link = Column(String, nullable=False)
campaign_name = Column(String)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f"<CallEvent(id={self.id}, call_session_id={self.call_session_id})>"

View File

@@ -0,0 +1,4 @@
from app.models.call_event import CallEvent
__all__ = ["CallEvent"]

View File

@@ -0,0 +1,20 @@
from sqlalchemy import Column, BigInteger, Integer, UUID, String
from app.core.database import Base
class CallEvent(Base):
__tablename__ = "call_events"
id = Column(UUID, primary_key=True, index=True)
call_session_id = Column(BigInteger, nullable=False, index=True)
direction = Column(String, nullable=False)
notification_mnemonic = Column(String, nullable=False)
last_answered_employee_full_name = Column(String, nullable=True)
employee_id = Column(Integer, nullable=True)
finish_time = Column(Integer, nullable=False)
total_time_duration = Column(Integer, nullable=False)
wait_time_duration = Column(Integer, nullable=False)
total_wait_time_duration = Column(Integer, nullable=False)
talk_time_duration = Column(Integer, nullable=False)
clean_talk_time_duration = Column(Integer, nullable=False)
full_record_file_link = Column(String, nullable=False)
tcm_topcrm_notification_name = Column(String, nullable=False)

View File

@@ -0,0 +1,8 @@
fastapi==0.121.2
uvicorn==0.38.0
pydantic==2.12.4
sqlalchemy==2.0.23
asyncpg==0.29.0
alembic==1.13.1
python-dotenv==1.0.0