Compare commits

..

6 Commits

15 changed files with 364 additions and 72 deletions

46
ansible/playbook.yml Normal file
View File

@@ -0,0 +1,46 @@
---
- hosts: dev
become: yes
become_method: sudo
roles:
- docker
tasks:
- name: Enable 22 port ufw
ufw:
rule: allow
port: "22"
proto: tcp
state: enabled
- name: Enable 80 port ufw
ufw:
rule: allow
port: "80"
proto: tcp
state: enabled
- name: Copy repository to server
copy:
src: "{{ item.src }}"
dest: "{{ item.dest }}"
loop:
- {
src: "{{ playbook_dir }}/../infra",
dest: "/home/{{ ansible_user }}/call-review-platform/infra",
}
- {
src: "{{ playbook_dir }}/../services",
dest: "/home/{{ ansible_user }}/call-review-platform/services",
}
- name: "Deploy infra"
tags: deploy_infra
community.docker.docker_compose_v2:
project_src: "/home/{{ ansible_user }}/call-review-platform/infra/infra"
state: present
recreate: always
- name: "Deploy ingest service"
tags: deploy_ingest
community.docker.docker_compose_v2:
project_src: "/home/{{ ansible_user }}/call-review-platform/services/services/ingest-service"
state: present
build: always
recreate: always

View File

@@ -0,0 +1,31 @@
---
- name: Install required packages
apt:
name:
- apt-transport-https
- ca-certificates
- curl
- gnupg
state: present
update_cache: yes
- name: Add Docker GPG key
apt_key:
url: https://download.docker.com/linux/ubuntu/gpg
state: present
- name: Add Docker repository
apt_repository:
repo: "deb [arch=amd64] https://download.docker.com/linux/ubuntu focal stable"
state: present
- name: Install Docker and Compose plugin
apt:
name:
- docker-ce
- docker-ce-cli
- containerd.io
- docker-buildx-plugin
- docker-compose-plugin
state: present
update_cache: yes

View File

@@ -1,6 +1,15 @@
version: "3.8"
services: services:
nginx:
image: nginx:latest
container_name: nginx
ports:
- "80:80"
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d:ro
depends_on:
- minio
networks:
- call-review-network
postgres: postgres:
image: postgres:15-alpine image: postgres:15-alpine
container_name: ingest-postgres container_name: ingest-postgres
@@ -8,8 +17,6 @@ services:
POSTGRES_DB: ingest_db POSTGRES_DB: ingest_db
POSTGRES_USER: postgres POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres POSTGRES_PASSWORD: postgres
ports:
- "5432:5432"
volumes: volumes:
- postgres_data:/var/lib/postgresql/data - postgres_data:/var/lib/postgresql/data
healthcheck: healthcheck:
@@ -19,9 +26,20 @@ services:
retries: 5 retries: 5
networks: networks:
- call-review-network - call-review-network
minio:
image: minio/minio
container_name: minio
environment:
MINIO_ROOT_USER: admin
MINIO_ROOT_PASSWORD: password123
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
networks:
- call-review-network
volumes: volumes:
postgres_data: postgres_data:
minio_data:
networks: networks:
call-review-network: call-review-network:

View File

@@ -0,0 +1,12 @@
server {
listen 80;
server_name crs-ingest-service.petrovskiy.ru;
location / {
proxy_pass http://ingest-service:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}

View File

@@ -0,0 +1,33 @@
server {
listen 80;
server_name s3.bikmeefftest.ru;
location / {
proxy_pass http://minio:9000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
server {
listen 80;
server_name crs-minio.petrovskiy.ru;
location / {
proxy_pass http://minio:9001;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}

View File

@@ -40,38 +40,50 @@ ingest-service/
### 1. Запуск через Docker Compose (рекомендуется) ### 1. Запуск через Docker Compose (рекомендуется)
> ⚠️ **ВАЖНО**: Необходимо сначала запустить инфраструктуру (PostgreSQL), которая создаст общую сеть `call-review-network`, а затем микросервис.
**Шаг 1: Запустить инфраструктуру (PostgreSQL)** **Шаг 1: Запустить инфраструктуру (PostgreSQL)**
```bash ```bash
# Из корня проекта # Из корня проекта
cd infra docker-compose -f infra/docker-compose.yaml up -d
docker-compose up -d
# Проверить, что PostgreSQL запущен # Проверить, что PostgreSQL запущен и сеть создана
docker-compose ps docker network ls | grep call-review
docker ps --filter "name=ingest-postgres"
``` ```
**Шаг 2: Запустить микросервис ingest-service** **Шаг 2: Запустить микросервис ingest-service**
```bash ```bash
# Из папки микросервиса # Из корня проекта
cd ../services/ingest-service docker-compose -f services/ingest-service/docker-compose.yaml up -d
docker-compose up -d
# Проверить логи # Проверить логи (увидите применение миграций и старт сервиса)
docker-compose logs -f docker-compose -f services/ingest-service/docker-compose.yaml logs -f
# Проверить статус # Проверить статус
docker-compose ps docker ps --filter "name=ingest"
```
**Проверка работы:**
```bash
# Health check
curl http://localhost:8000/health
# Swagger UI
open http://localhost:8000/docs
``` ```
**Остановка сервисов:** **Остановка сервисов:**
```bash ```bash
# Остановить микросервис # Остановить микросервис (из корня проекта)
cd services/ingest-service docker-compose -f services/ingest-service/docker-compose.yaml down
docker-compose down
# Остановить инфраструктуру # Остановить инфраструктуру (ВНИМАНИЕ: остановит PostgreSQL!)
cd ../../infra docker-compose -f infra/docker-compose.yaml down
docker-compose down
# Или остановить всё сразу
docker-compose -f services/ingest-service/docker-compose.yaml down && \
docker-compose -f infra/docker-compose.yaml down
``` ```
Приложение будет доступно по адресу: http://localhost:8000 Приложение будет доступно по адресу: http://localhost:8000

View File

@@ -1,8 +1,8 @@
"""Initial migration with call_events table """INIT
Revision ID: a7e5c5ef6bc1 Revision ID: 49dc6736230f
Revises: Revises:
Create Date: 2025-11-19 22:43:33.739763 Create Date: 2025-12-06 13:21:32.669444
""" """
from typing import Sequence, Union from typing import Sequence, Union
@@ -12,14 +12,13 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision: str = 'a7e5c5ef6bc1' revision: str = '49dc6736230f'
down_revision: Union[str, Sequence[str], None] = None down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None: def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.create_table('call_events', op.create_table('call_events',
sa.Column('id', sa.UUID(), nullable=False), sa.Column('id', sa.UUID(), nullable=False),
@@ -36,17 +35,20 @@ def upgrade() -> None:
sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False), sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False),
sa.Column('full_record_file_link', sa.String(), nullable=False), sa.Column('full_record_file_link', sa.String(), nullable=False),
sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False), sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False),
sa.Column('s3_key', sa.String(), nullable=True),
sa.Column('processing_status', sa.String(), nullable=False),
sa.Column('retries', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id')
) )
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False) op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False) op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False)
# ### end Alembic commands ### # ### end Alembic commands ###
def downgrade() -> None: def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_id'), table_name='call_events') op.drop_index(op.f('ix_call_events_id'), table_name='call_events')
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events') op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.drop_table('call_events') op.drop_table('call_events')
# ### end Alembic commands ### # ### end Alembic commands ###

View File

@@ -1,33 +0,0 @@
"""add_unique_constraint_to_call_session_id
Revision ID: 9163176d6848
Revises: a7e5c5ef6bc1
Create Date: 2025-11-20 23:28:40.770696
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '9163176d6848'
down_revision: Union[str, Sequence[str], None] = 'a7e5c5ef6bc1'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False)
# ### end Alembic commands ###

View File

@@ -0,0 +1,18 @@
import os
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str
S3_ENDPOINT_URL: str
AWS_ACCESS_KEY_ID: str
AWS_SECRET_ACCESS_KEY: str
AWS_REGION: str
S3_BUCKET_NAME: str
MAX_RETRIES: int
RETRY_DELAY: int
# class Config:
# env_file = ".env"
# env_file_encoding = "utf-8"
settings = Settings()

View File

@@ -0,0 +1,67 @@
import aioboto3
import asyncio
import logging
import os
import aiohttp
from app.config import settings
logger = logging.getLogger(__name__)
async def download_and_upload_stream(url: str, key_name: str) -> bool:
"""
Скачивает аудиофайл по URL потоками и передаёт в upload_file_to_s3 для загрузки.
"""
try:
logger.info(f"[INFO] Начало скачивания {url}")
async with aiohttp.ClientSession() as http_session:
async with http_session.get(url) as resp:
resp.raise_for_status()
# читаем файл по частям и собираем в bytes
chunks = []
while True:
chunk = await resp.content.read(1024 * 1024) # 1 МБ
if not chunk:
break
chunks.append(chunk)
file_bytes = b"".join(chunks)
logger.info(f"[INFO] Скачивание {url} завершено, размер {len(file_bytes)} байт")
# передаём в функцию загрузки с retry
success = await upload_file_to_s3(file_bytes, key_name)
return success
except Exception as e:
logger.error(f"[FAILED] Не удалось скачать или загрузить {url}: {e}")
return False
async def upload_file_to_s3(file_bytes: bytes, key_name: str) -> bool:
"""
Асинхронно загружает файл в S3 с ретраями.
Возвращает True при успехе, False при ошибке.
"""
session = aioboto3.Session()
for attempt in range(1, settings.MAX_RETRIES + 1):
try:
logger.info(f"Попытка {attempt} загрузки {key_name} в S3")
#logger.info(f"Параметры S3: {S3_ENDPOINT_URL}, {S3_BUCKET_NAME}, {AWS_REGION}")
async with session.client(
"s3",
endpoint_url=settings.S3_ENDPOINT_URL,
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION
) as s3:
await s3.put_object(Bucket=settings.S3_BUCKET_NAME, Key=key_name, Body=file_bytes)
logger.info(f"[OK] Файл {key_name} загружен в S3!")
return True
except Exception as e:
logger.exception(f"[ERROR] Попытка {attempt} для {key_name} не удалась: {e}")
if attempt < settings.MAX_RETRIES:
await asyncio.sleep(settings.RETRY_DELAY)
else:
logger.error(f"[FAILED] Файл {key_name} не удалось загрузить после {settings.MAX_RETRIES} попыток")
return False

View File

@@ -0,0 +1,60 @@
import asyncio
import logging
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from app.models.call_event import CallEvent
from app.infrastructure.s3 import download_and_upload_stream
from app.infrastructure.database import get_db
from app.config import settings
logger = logging.getLogger(__name__)
BATCH_SIZE = 20
MAX_RETRIES_CALL = 3
async def fetch_pending_calls(session: AsyncSession):
stmt = (
select(CallEvent)
.where(CallEvent.processing_status == "pending")
.where(CallEvent.retries < MAX_RETRIES_CALL)
.order_by(CallEvent.id)
.limit(BATCH_SIZE)
.with_for_update(skip_locked=True)
)
result = await session.execute(stmt)
calls = result.scalars().all()
return calls
async def process_pending_calls():
logger.info("[INFO] Запуск обработки новых записей")
async for session in get_db():
calls = await fetch_pending_calls(session)
if not calls:
logger.info("[INFO] Нет новых записей для обработки")
return
# сразу увеличиваем retries и помечаем processing
for call in calls:
call.retries += 1
call.processing_status = "processing"
await session.commit()
# создаём задачи на скачивание/загрузку
tasks = []
for call in calls:
key_name = f"audio/{call.id}.mp3"
logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link)
tasks.append(download_and_upload_stream(call.full_record_file_link, key_name))
results = await asyncio.gather(*tasks, return_exceptions=True)
# обновляем статус после обработки
for call, result in zip(calls, results):
if isinstance(result, Exception) or result is False:
call.processing_status = "failed"
else:
call.processing_status = "done"
call.s3_key = f"audio/{call.id}.mp3"
await session.commit()
logger.info(f"Обработка пачки из {len(calls)} записей завершена")

View File

@@ -1,10 +1,16 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, status from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.concurrency import asynccontextmanager
import logging import logging
from app.infrastructure.scheduler import process_pending_calls
from app.api.uis import router as uis_router from app.api.uis import router as uis_router
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI
import asyncio
from app.infrastructure.database import get_db
# Настройка логирования # Настройка логирования
logging.basicConfig( logging.basicConfig(
@@ -13,13 +19,23 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app:FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(process_pending_calls, "interval", seconds = 10)
scheduler.start()
yield
await scheduler.shutdown(wait=True)
app = FastAPI( app = FastAPI(
title="Ingest Service API", title="Ingest Service API",
description="Микросервис для приема событий звонков", description="Микросервис для приема событий звонков",
version="1.0.0", version="1.0.0",
lifespan=lifespan
) )
# Exception handlers # Exception handlers
@app.exception_handler(CallEventAlreadyExistsError) @app.exception_handler(CallEventAlreadyExistsError)
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError): async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):

View File

@@ -1,7 +1,9 @@
from sqlalchemy import Column, BigInteger, Integer, String, DateTime from sqlalchemy import Column, BigInteger, Integer, String, DateTime, Enum as SQLEnum
from enum import Enum
from sqlalchemy.dialects.postgresql import UUID as PG_UUID from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid import uuid
from app.infrastructure.database import Base from app.infrastructure.database import Base
from enum import Enum as PyEnum
class CallEvent(Base): class CallEvent(Base):
__tablename__ = "call_events" __tablename__ = "call_events"
@@ -20,3 +22,7 @@ class CallEvent(Base):
clean_talk_time_duration = Column(Integer, nullable=False) clean_talk_time_duration = Column(Integer, nullable=False)
full_record_file_link = Column(String, nullable=False) full_record_file_link = Column(String, nullable=False)
tcm_topcrm_notification_name = Column(String, nullable=False) tcm_topcrm_notification_name = Column(String, nullable=False)
s3_key = Column(String, nullable=True)
processing_status = Column(String, default="pending", nullable=False)
retries = Column(Integer, default=0, nullable=False)

View File

@@ -6,12 +6,8 @@ services:
context: . context: .
dockerfile: Dockerfile dockerfile: Dockerfile
container_name: ingest-service container_name: ingest-service
environment: env_file:
# Подключение к PostgreSQL из infra/docker-compose.yaml - .env
DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db
APP_HOST: 0.0.0.0
APP_PORT: 8000
DEBUG: "False"
ports: ports:
- "8000:8000" - "8000:8000"
networks: networks:
@@ -21,7 +17,13 @@ services:
command: > command: >
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000" sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"] test:
[
"CMD",
"python",
"-c",
"import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()",
]
interval: 30s interval: 30s
timeout: 3s timeout: 3s
start_period: 10s start_period: 10s

View File

@@ -5,4 +5,6 @@ sqlalchemy==2.0.23
asyncpg==0.29.0 asyncpg==0.29.0
alembic==1.13.1 alembic==1.13.1
python-dotenv==1.0.0 python-dotenv==1.0.0
aioboto3==15.5.0
apscheduler==3.11.1
pydantic_settings==2.11.0