Compare commits
6 Commits
3d4430134b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 6b09daab61 | |||
| a5d3a72d0c | |||
| 3cc360c8b9 | |||
| f72451cdd8 | |||
| bceb06b5b6 | |||
| 6862ea5760 |
46
ansible/playbook.yml
Normal file
46
ansible/playbook.yml
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
---
|
||||||
|
- hosts: dev
|
||||||
|
become: yes
|
||||||
|
become_method: sudo
|
||||||
|
roles:
|
||||||
|
- docker
|
||||||
|
tasks:
|
||||||
|
- name: Enable 22 port ufw
|
||||||
|
ufw:
|
||||||
|
rule: allow
|
||||||
|
port: "22"
|
||||||
|
proto: tcp
|
||||||
|
state: enabled
|
||||||
|
|
||||||
|
- name: Enable 80 port ufw
|
||||||
|
ufw:
|
||||||
|
rule: allow
|
||||||
|
port: "80"
|
||||||
|
proto: tcp
|
||||||
|
state: enabled
|
||||||
|
- name: Copy repository to server
|
||||||
|
copy:
|
||||||
|
src: "{{ item.src }}"
|
||||||
|
dest: "{{ item.dest }}"
|
||||||
|
loop:
|
||||||
|
- {
|
||||||
|
src: "{{ playbook_dir }}/../infra",
|
||||||
|
dest: "/home/{{ ansible_user }}/call-review-platform/infra",
|
||||||
|
}
|
||||||
|
- {
|
||||||
|
src: "{{ playbook_dir }}/../services",
|
||||||
|
dest: "/home/{{ ansible_user }}/call-review-platform/services",
|
||||||
|
}
|
||||||
|
- name: "Deploy infra"
|
||||||
|
tags: deploy_infra
|
||||||
|
community.docker.docker_compose_v2:
|
||||||
|
project_src: "/home/{{ ansible_user }}/call-review-platform/infra/infra"
|
||||||
|
state: present
|
||||||
|
recreate: always
|
||||||
|
- name: "Deploy ingest service"
|
||||||
|
tags: deploy_ingest
|
||||||
|
community.docker.docker_compose_v2:
|
||||||
|
project_src: "/home/{{ ansible_user }}/call-review-platform/services/services/ingest-service"
|
||||||
|
state: present
|
||||||
|
build: always
|
||||||
|
recreate: always
|
||||||
31
ansible/roles/docker/tasks/main.yml
Normal file
31
ansible/roles/docker/tasks/main.yml
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
---
|
||||||
|
- name: Install required packages
|
||||||
|
apt:
|
||||||
|
name:
|
||||||
|
- apt-transport-https
|
||||||
|
- ca-certificates
|
||||||
|
- curl
|
||||||
|
- gnupg
|
||||||
|
state: present
|
||||||
|
update_cache: yes
|
||||||
|
|
||||||
|
- name: Add Docker GPG key
|
||||||
|
apt_key:
|
||||||
|
url: https://download.docker.com/linux/ubuntu/gpg
|
||||||
|
state: present
|
||||||
|
|
||||||
|
- name: Add Docker repository
|
||||||
|
apt_repository:
|
||||||
|
repo: "deb [arch=amd64] https://download.docker.com/linux/ubuntu focal stable"
|
||||||
|
state: present
|
||||||
|
|
||||||
|
- name: Install Docker and Compose plugin
|
||||||
|
apt:
|
||||||
|
name:
|
||||||
|
- docker-ce
|
||||||
|
- docker-ce-cli
|
||||||
|
- containerd.io
|
||||||
|
- docker-buildx-plugin
|
||||||
|
- docker-compose-plugin
|
||||||
|
state: present
|
||||||
|
update_cache: yes
|
||||||
@@ -1,6 +1,15 @@
|
|||||||
version: "3.8"
|
|
||||||
|
|
||||||
services:
|
services:
|
||||||
|
nginx:
|
||||||
|
image: nginx:latest
|
||||||
|
container_name: nginx
|
||||||
|
ports:
|
||||||
|
- "80:80"
|
||||||
|
volumes:
|
||||||
|
- ./nginx/conf.d:/etc/nginx/conf.d:ro
|
||||||
|
depends_on:
|
||||||
|
- minio
|
||||||
|
networks:
|
||||||
|
- call-review-network
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:15-alpine
|
image: postgres:15-alpine
|
||||||
container_name: ingest-postgres
|
container_name: ingest-postgres
|
||||||
@@ -8,8 +17,6 @@ services:
|
|||||||
POSTGRES_DB: ingest_db
|
POSTGRES_DB: ingest_db
|
||||||
POSTGRES_USER: postgres
|
POSTGRES_USER: postgres
|
||||||
POSTGRES_PASSWORD: postgres
|
POSTGRES_PASSWORD: postgres
|
||||||
ports:
|
|
||||||
- "5432:5432"
|
|
||||||
volumes:
|
volumes:
|
||||||
- postgres_data:/var/lib/postgresql/data
|
- postgres_data:/var/lib/postgresql/data
|
||||||
healthcheck:
|
healthcheck:
|
||||||
@@ -19,9 +26,20 @@ services:
|
|||||||
retries: 5
|
retries: 5
|
||||||
networks:
|
networks:
|
||||||
- call-review-network
|
- call-review-network
|
||||||
|
minio:
|
||||||
|
image: minio/minio
|
||||||
|
container_name: minio
|
||||||
|
environment:
|
||||||
|
MINIO_ROOT_USER: admin
|
||||||
|
MINIO_ROOT_PASSWORD: password123
|
||||||
|
command: server /data --console-address ":9001"
|
||||||
|
volumes:
|
||||||
|
- minio_data:/data
|
||||||
|
networks:
|
||||||
|
- call-review-network
|
||||||
volumes:
|
volumes:
|
||||||
postgres_data:
|
postgres_data:
|
||||||
|
minio_data:
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
call-review-network:
|
call-review-network:
|
||||||
|
|||||||
12
infra/nginx/conf.d/ingest-service.conf
Normal file
12
infra/nginx/conf.d/ingest-service.conf
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
server {
|
||||||
|
listen 80;
|
||||||
|
server_name crs-ingest-service.petrovskiy.ru;
|
||||||
|
|
||||||
|
location / {
|
||||||
|
proxy_pass http://ingest-service:8000;
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
proxy_set_header X-Real-IP $remote_addr;
|
||||||
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
|
proxy_set_header X-Forwarded-Proto $scheme;
|
||||||
|
}
|
||||||
|
}
|
||||||
33
infra/nginx/conf.d/minio.conf
Normal file
33
infra/nginx/conf.d/minio.conf
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
server {
|
||||||
|
listen 80;
|
||||||
|
server_name s3.bikmeefftest.ru;
|
||||||
|
|
||||||
|
location / {
|
||||||
|
proxy_pass http://minio:9000;
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
proxy_set_header X-Real-IP $remote_addr;
|
||||||
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
|
proxy_set_header X-Forwarded-Proto $scheme;
|
||||||
|
|
||||||
|
proxy_http_version 1.1;
|
||||||
|
proxy_set_header Upgrade $http_upgrade;
|
||||||
|
proxy_set_header Connection "upgrade";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
server {
|
||||||
|
listen 80;
|
||||||
|
server_name crs-minio.petrovskiy.ru;
|
||||||
|
|
||||||
|
location / {
|
||||||
|
proxy_pass http://minio:9001;
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
proxy_set_header X-Real-IP $remote_addr;
|
||||||
|
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||||
|
proxy_set_header X-Forwarded-Proto $scheme;
|
||||||
|
|
||||||
|
proxy_http_version 1.1;
|
||||||
|
proxy_set_header Upgrade $http_upgrade;
|
||||||
|
proxy_set_header Connection "upgrade";
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -40,38 +40,50 @@ ingest-service/
|
|||||||
|
|
||||||
### 1. Запуск через Docker Compose (рекомендуется)
|
### 1. Запуск через Docker Compose (рекомендуется)
|
||||||
|
|
||||||
|
> ⚠️ **ВАЖНО**: Необходимо сначала запустить инфраструктуру (PostgreSQL), которая создаст общую сеть `call-review-network`, а затем микросервис.
|
||||||
|
|
||||||
**Шаг 1: Запустить инфраструктуру (PostgreSQL)**
|
**Шаг 1: Запустить инфраструктуру (PostgreSQL)**
|
||||||
```bash
|
```bash
|
||||||
# Из корня проекта
|
# Из корня проекта
|
||||||
cd infra
|
docker-compose -f infra/docker-compose.yaml up -d
|
||||||
docker-compose up -d
|
|
||||||
|
|
||||||
# Проверить, что PostgreSQL запущен
|
# Проверить, что PostgreSQL запущен и сеть создана
|
||||||
docker-compose ps
|
docker network ls | grep call-review
|
||||||
|
docker ps --filter "name=ingest-postgres"
|
||||||
```
|
```
|
||||||
|
|
||||||
**Шаг 2: Запустить микросервис ingest-service**
|
**Шаг 2: Запустить микросервис ingest-service**
|
||||||
```bash
|
```bash
|
||||||
# Из папки микросервиса
|
# Из корня проекта
|
||||||
cd ../services/ingest-service
|
docker-compose -f services/ingest-service/docker-compose.yaml up -d
|
||||||
docker-compose up -d
|
|
||||||
|
|
||||||
# Проверить логи
|
# Проверить логи (увидите применение миграций и старт сервиса)
|
||||||
docker-compose logs -f
|
docker-compose -f services/ingest-service/docker-compose.yaml logs -f
|
||||||
|
|
||||||
# Проверить статус
|
# Проверить статус
|
||||||
docker-compose ps
|
docker ps --filter "name=ingest"
|
||||||
|
```
|
||||||
|
|
||||||
|
**Проверка работы:**
|
||||||
|
```bash
|
||||||
|
# Health check
|
||||||
|
curl http://localhost:8000/health
|
||||||
|
|
||||||
|
# Swagger UI
|
||||||
|
open http://localhost:8000/docs
|
||||||
```
|
```
|
||||||
|
|
||||||
**Остановка сервисов:**
|
**Остановка сервисов:**
|
||||||
```bash
|
```bash
|
||||||
# Остановить микросервис
|
# Остановить микросервис (из корня проекта)
|
||||||
cd services/ingest-service
|
docker-compose -f services/ingest-service/docker-compose.yaml down
|
||||||
docker-compose down
|
|
||||||
|
|
||||||
# Остановить инфраструктуру
|
# Остановить инфраструктуру (ВНИМАНИЕ: остановит PostgreSQL!)
|
||||||
cd ../../infra
|
docker-compose -f infra/docker-compose.yaml down
|
||||||
docker-compose down
|
|
||||||
|
# Или остановить всё сразу
|
||||||
|
docker-compose -f services/ingest-service/docker-compose.yaml down && \
|
||||||
|
docker-compose -f infra/docker-compose.yaml down
|
||||||
```
|
```
|
||||||
|
|
||||||
Приложение будет доступно по адресу: http://localhost:8000
|
Приложение будет доступно по адресу: http://localhost:8000
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
"""Initial migration with call_events table
|
"""INIT
|
||||||
|
|
||||||
Revision ID: a7e5c5ef6bc1
|
Revision ID: 49dc6736230f
|
||||||
Revises:
|
Revises:
|
||||||
Create Date: 2025-11-19 22:43:33.739763
|
Create Date: 2025-12-06 13:21:32.669444
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Sequence, Union
|
from typing import Sequence, Union
|
||||||
@@ -12,14 +12,13 @@ import sqlalchemy as sa
|
|||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision: str = 'a7e5c5ef6bc1'
|
revision: str = '49dc6736230f'
|
||||||
down_revision: Union[str, Sequence[str], None] = None
|
down_revision: Union[str, Sequence[str], None] = None
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
def upgrade() -> None:
|
||||||
"""Upgrade schema."""
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.create_table('call_events',
|
op.create_table('call_events',
|
||||||
sa.Column('id', sa.UUID(), nullable=False),
|
sa.Column('id', sa.UUID(), nullable=False),
|
||||||
@@ -36,17 +35,20 @@ def upgrade() -> None:
|
|||||||
sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False),
|
sa.Column('clean_talk_time_duration', sa.Integer(), nullable=False),
|
||||||
sa.Column('full_record_file_link', sa.String(), nullable=False),
|
sa.Column('full_record_file_link', sa.String(), nullable=False),
|
||||||
sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False),
|
sa.Column('tcm_topcrm_notification_name', sa.String(), nullable=False),
|
||||||
|
sa.Column('s3_key', sa.String(), nullable=True),
|
||||||
|
sa.Column('processing_status', sa.String(), nullable=False),
|
||||||
|
sa.Column('retries', sa.Integer(), nullable=False),
|
||||||
sa.PrimaryKeyConstraint('id')
|
sa.PrimaryKeyConstraint('id')
|
||||||
)
|
)
|
||||||
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False)
|
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
|
||||||
op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False)
|
op.create_index(op.f('ix_call_events_id'), 'call_events', ['id'], unique=False)
|
||||||
# ### end Alembic commands ###
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
def downgrade() -> None:
|
||||||
"""Downgrade schema."""
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
op.drop_index(op.f('ix_call_events_id'), table_name='call_events')
|
op.drop_index(op.f('ix_call_events_id'), table_name='call_events')
|
||||||
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
|
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
|
||||||
op.drop_table('call_events')
|
op.drop_table('call_events')
|
||||||
# ### end Alembic commands ###
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
@@ -1,33 +0,0 @@
|
|||||||
"""add_unique_constraint_to_call_session_id
|
|
||||||
|
|
||||||
Revision ID: 9163176d6848
|
|
||||||
Revises: a7e5c5ef6bc1
|
|
||||||
Create Date: 2025-11-20 23:28:40.770696
|
|
||||||
|
|
||||||
"""
|
|
||||||
from typing import Sequence, Union
|
|
||||||
|
|
||||||
from alembic import op
|
|
||||||
import sqlalchemy as sa
|
|
||||||
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = '9163176d6848'
|
|
||||||
down_revision: Union[str, Sequence[str], None] = 'a7e5c5ef6bc1'
|
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
|
||||||
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
|
|
||||||
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=True)
|
|
||||||
# ### end Alembic commands ###
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
|
||||||
# ### commands auto generated by Alembic - please adjust! ###
|
|
||||||
op.drop_index(op.f('ix_call_events_call_session_id'), table_name='call_events')
|
|
||||||
op.create_index(op.f('ix_call_events_call_session_id'), 'call_events', ['call_session_id'], unique=False)
|
|
||||||
# ### end Alembic commands ###
|
|
||||||
|
|
||||||
18
services/ingest-service/app/config.py
Normal file
18
services/ingest-service/app/config.py
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
import os
|
||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
class Settings(BaseSettings):
|
||||||
|
DATABASE_URL: str
|
||||||
|
S3_ENDPOINT_URL: str
|
||||||
|
AWS_ACCESS_KEY_ID: str
|
||||||
|
AWS_SECRET_ACCESS_KEY: str
|
||||||
|
AWS_REGION: str
|
||||||
|
S3_BUCKET_NAME: str
|
||||||
|
MAX_RETRIES: int
|
||||||
|
RETRY_DELAY: int
|
||||||
|
|
||||||
|
# class Config:
|
||||||
|
# env_file = ".env"
|
||||||
|
# env_file_encoding = "utf-8"
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
67
services/ingest-service/app/infrastructure/s3.py
Normal file
67
services/ingest-service/app/infrastructure/s3.py
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
import aioboto3
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
|
||||||
|
import aiohttp
|
||||||
|
from app.config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
async def download_and_upload_stream(url: str, key_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
Скачивает аудиофайл по URL потоками и передаёт в upload_file_to_s3 для загрузки.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
logger.info(f"[INFO] Начало скачивания {url}")
|
||||||
|
async with aiohttp.ClientSession() as http_session:
|
||||||
|
async with http_session.get(url) as resp:
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
# читаем файл по частям и собираем в bytes
|
||||||
|
chunks = []
|
||||||
|
while True:
|
||||||
|
chunk = await resp.content.read(1024 * 1024) # 1 МБ
|
||||||
|
if not chunk:
|
||||||
|
break
|
||||||
|
chunks.append(chunk)
|
||||||
|
file_bytes = b"".join(chunks)
|
||||||
|
|
||||||
|
logger.info(f"[INFO] Скачивание {url} завершено, размер {len(file_bytes)} байт")
|
||||||
|
|
||||||
|
# передаём в функцию загрузки с retry
|
||||||
|
success = await upload_file_to_s3(file_bytes, key_name)
|
||||||
|
return success
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[FAILED] Не удалось скачать или загрузить {url}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def upload_file_to_s3(file_bytes: bytes, key_name: str) -> bool:
|
||||||
|
"""
|
||||||
|
Асинхронно загружает файл в S3 с ретраями.
|
||||||
|
Возвращает True при успехе, False при ошибке.
|
||||||
|
"""
|
||||||
|
session = aioboto3.Session()
|
||||||
|
for attempt in range(1, settings.MAX_RETRIES + 1):
|
||||||
|
try:
|
||||||
|
logger.info(f"Попытка {attempt} загрузки {key_name} в S3")
|
||||||
|
#logger.info(f"Параметры S3: {S3_ENDPOINT_URL}, {S3_BUCKET_NAME}, {AWS_REGION}")
|
||||||
|
|
||||||
|
async with session.client(
|
||||||
|
"s3",
|
||||||
|
endpoint_url=settings.S3_ENDPOINT_URL,
|
||||||
|
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
|
||||||
|
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
|
||||||
|
region_name=settings.AWS_REGION
|
||||||
|
) as s3:
|
||||||
|
await s3.put_object(Bucket=settings.S3_BUCKET_NAME, Key=key_name, Body=file_bytes)
|
||||||
|
logger.info(f"[OK] Файл {key_name} загружен в S3!")
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"[ERROR] Попытка {attempt} для {key_name} не удалась: {e}")
|
||||||
|
if attempt < settings.MAX_RETRIES:
|
||||||
|
await asyncio.sleep(settings.RETRY_DELAY)
|
||||||
|
else:
|
||||||
|
logger.error(f"[FAILED] Файл {key_name} не удалось загрузить после {settings.MAX_RETRIES} попыток")
|
||||||
|
return False
|
||||||
60
services/ingest-service/app/infrastructure/scheduler.py
Normal file
60
services/ingest-service/app/infrastructure/scheduler.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from sqlalchemy import select
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
|
||||||
|
from app.models.call_event import CallEvent
|
||||||
|
from app.infrastructure.s3 import download_and_upload_stream
|
||||||
|
from app.infrastructure.database import get_db
|
||||||
|
from app.config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BATCH_SIZE = 20
|
||||||
|
MAX_RETRIES_CALL = 3
|
||||||
|
async def fetch_pending_calls(session: AsyncSession):
|
||||||
|
stmt = (
|
||||||
|
select(CallEvent)
|
||||||
|
.where(CallEvent.processing_status == "pending")
|
||||||
|
.where(CallEvent.retries < MAX_RETRIES_CALL)
|
||||||
|
.order_by(CallEvent.id)
|
||||||
|
.limit(BATCH_SIZE)
|
||||||
|
.with_for_update(skip_locked=True)
|
||||||
|
)
|
||||||
|
result = await session.execute(stmt)
|
||||||
|
calls = result.scalars().all()
|
||||||
|
return calls
|
||||||
|
|
||||||
|
async def process_pending_calls():
|
||||||
|
logger.info("[INFO] Запуск обработки новых записей")
|
||||||
|
|
||||||
|
async for session in get_db():
|
||||||
|
calls = await fetch_pending_calls(session)
|
||||||
|
if not calls:
|
||||||
|
logger.info("[INFO] Нет новых записей для обработки")
|
||||||
|
return
|
||||||
|
|
||||||
|
# сразу увеличиваем retries и помечаем processing
|
||||||
|
for call in calls:
|
||||||
|
call.retries += 1
|
||||||
|
call.processing_status = "processing"
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
# создаём задачи на скачивание/загрузку
|
||||||
|
tasks = []
|
||||||
|
for call in calls:
|
||||||
|
key_name = f"audio/{call.id}.mp3"
|
||||||
|
logger.info("Попытка скачать и загрузить запись %s", call.full_record_file_link)
|
||||||
|
tasks.append(download_and_upload_stream(call.full_record_file_link, key_name))
|
||||||
|
|
||||||
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
|
||||||
|
# обновляем статус после обработки
|
||||||
|
for call, result in zip(calls, results):
|
||||||
|
if isinstance(result, Exception) or result is False:
|
||||||
|
call.processing_status = "failed"
|
||||||
|
else:
|
||||||
|
call.processing_status = "done"
|
||||||
|
call.s3_key = f"audio/{call.id}.mp3"
|
||||||
|
|
||||||
|
await session.commit()
|
||||||
|
logger.info(f"Обработка пачки из {len(calls)} записей завершена")
|
||||||
@@ -1,10 +1,16 @@
|
|||||||
from contextlib import asynccontextmanager
|
|
||||||
from fastapi import FastAPI, Request, status
|
from fastapi import FastAPI, Request, status
|
||||||
from fastapi.responses import JSONResponse
|
from fastapi.responses import JSONResponse
|
||||||
|
from fastapi.concurrency import asynccontextmanager
|
||||||
import logging
|
import logging
|
||||||
|
from app.infrastructure.scheduler import process_pending_calls
|
||||||
|
|
||||||
from app.api.uis import router as uis_router
|
from app.api.uis import router as uis_router
|
||||||
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
|
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
|
||||||
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
import asyncio
|
||||||
|
from app.infrastructure.database import get_db
|
||||||
|
|
||||||
# Настройка логирования
|
# Настройка логирования
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
@@ -13,13 +19,23 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app:FastAPI):
|
||||||
|
scheduler = AsyncIOScheduler()
|
||||||
|
|
||||||
|
scheduler.add_job(process_pending_calls, "interval", seconds = 10)
|
||||||
|
|
||||||
|
scheduler.start()
|
||||||
|
yield
|
||||||
|
await scheduler.shutdown(wait=True)
|
||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="Ingest Service API",
|
title="Ingest Service API",
|
||||||
description="Микросервис для приема событий звонков",
|
description="Микросервис для приема событий звонков",
|
||||||
version="1.0.0",
|
version="1.0.0",
|
||||||
|
lifespan=lifespan
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# Exception handlers
|
# Exception handlers
|
||||||
@app.exception_handler(CallEventAlreadyExistsError)
|
@app.exception_handler(CallEventAlreadyExistsError)
|
||||||
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
|
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
|
||||||
|
|||||||
@@ -1,7 +1,9 @@
|
|||||||
from sqlalchemy import Column, BigInteger, Integer, String, DateTime
|
from sqlalchemy import Column, BigInteger, Integer, String, DateTime, Enum as SQLEnum
|
||||||
|
from enum import Enum
|
||||||
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
|
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
|
||||||
import uuid
|
import uuid
|
||||||
from app.infrastructure.database import Base
|
from app.infrastructure.database import Base
|
||||||
|
from enum import Enum as PyEnum
|
||||||
|
|
||||||
class CallEvent(Base):
|
class CallEvent(Base):
|
||||||
__tablename__ = "call_events"
|
__tablename__ = "call_events"
|
||||||
@@ -20,3 +22,7 @@ class CallEvent(Base):
|
|||||||
clean_talk_time_duration = Column(Integer, nullable=False)
|
clean_talk_time_duration = Column(Integer, nullable=False)
|
||||||
full_record_file_link = Column(String, nullable=False)
|
full_record_file_link = Column(String, nullable=False)
|
||||||
tcm_topcrm_notification_name = Column(String, nullable=False)
|
tcm_topcrm_notification_name = Column(String, nullable=False)
|
||||||
|
|
||||||
|
s3_key = Column(String, nullable=True)
|
||||||
|
processing_status = Column(String, default="pending", nullable=False)
|
||||||
|
retries = Column(Integer, default=0, nullable=False)
|
||||||
@@ -6,12 +6,8 @@ services:
|
|||||||
context: .
|
context: .
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
container_name: ingest-service
|
container_name: ingest-service
|
||||||
environment:
|
env_file:
|
||||||
# Подключение к PostgreSQL из infra/docker-compose.yaml
|
- .env
|
||||||
DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db
|
|
||||||
APP_HOST: 0.0.0.0
|
|
||||||
APP_PORT: 8000
|
|
||||||
DEBUG: "False"
|
|
||||||
ports:
|
ports:
|
||||||
- "8000:8000"
|
- "8000:8000"
|
||||||
networks:
|
networks:
|
||||||
@@ -21,7 +17,13 @@ services:
|
|||||||
command: >
|
command: >
|
||||||
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
|
sh -c "alembic upgrade head && uvicorn app.main:app --host 0.0.0.0 --port 8000"
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()"]
|
test:
|
||||||
|
[
|
||||||
|
"CMD",
|
||||||
|
"python",
|
||||||
|
"-c",
|
||||||
|
"import urllib.request; urllib.request.urlopen('http://localhost:8000/health').read()",
|
||||||
|
]
|
||||||
interval: 30s
|
interval: 30s
|
||||||
timeout: 3s
|
timeout: 3s
|
||||||
start_period: 10s
|
start_period: 10s
|
||||||
|
|||||||
@@ -5,4 +5,6 @@ sqlalchemy==2.0.23
|
|||||||
asyncpg==0.29.0
|
asyncpg==0.29.0
|
||||||
alembic==1.13.1
|
alembic==1.13.1
|
||||||
python-dotenv==1.0.0
|
python-dotenv==1.0.0
|
||||||
|
aioboto3==15.5.0
|
||||||
|
apscheduler==3.11.1
|
||||||
|
pydantic_settings==2.11.0
|
||||||
Reference in New Issue
Block a user