From 3d4430134b8cbb02a7b3ff06dca90b4a48abfe90 Mon Sep 17 00:00:00 2001 From: Ilya Bikmeev Date: Thu, 20 Nov 2025 23:38:26 +0700 Subject: [PATCH] feature: added exception handling --- services/ingest-service/.env | 1 + services/ingest-service/app/api/uis.py | 74 ++++++++++++++---- services/ingest-service/app/core/__init__.py | 0 .../ingest-service/app/core/exceptions.py | 29 +++++++ services/ingest-service/app/crud.py | 62 +++++++++++++-- .../app/infrastructure/database.py | 19 ++++- services/ingest-service/app/main.py | 77 ++++++++++++++++++- services/ingest-service/docker-compose.yaml | 1 + 8 files changed, 237 insertions(+), 26 deletions(-) create mode 100644 services/ingest-service/app/core/__init__.py create mode 100644 services/ingest-service/app/core/exceptions.py diff --git a/services/ingest-service/.env b/services/ingest-service/.env index 371d03f..334b210 100644 --- a/services/ingest-service/.env +++ b/services/ingest-service/.env @@ -4,4 +4,5 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db # Application APP_HOST=0.0.0.0 APP_PORT=8000 +DEBUG=False diff --git a/services/ingest-service/app/api/uis.py b/services/ingest-service/app/api/uis.py index ed63f9a..e0b7f84 100644 --- a/services/ingest-service/app/api/uis.py +++ b/services/ingest-service/app/api/uis.py @@ -1,39 +1,79 @@ from datetime import datetime from enum import Enum from typing import List +import logging + from fastapi import APIRouter, Depends, HTTPException -from pydantic import BaseModel, HttpUrl +from pydantic import BaseModel, HttpUrl, Field, field_validator from sqlalchemy.ext.asyncio import AsyncSession from app.infrastructure.database import get_db from app import crud +from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError + +logger = logging.getLogger(__name__) + class CallDirection(str, Enum): + """Направление звонка""" in_ = "in" out = "out" class UisCallEvent(BaseModel): - eventType: str - call_session_id: int - direction: CallDirection - employee_id: int - employee_full_name: str - contact_phone_number: str - called_phone_number: str - communication_group_name: str - start_time: datetime - finish_time: datetime - talk_time_duration: int - full_record_file_link: HttpUrl - campaign_name: str + """Схема события звонка от UIS""" + eventType: str = Field(..., description="Тип события") + call_session_id: int = Field(..., gt=0, description="Уникальный ID сессии звонка") + direction: CallDirection = Field(..., description="Направление звонка") + employee_id: int = Field(..., gt=0, description="ID сотрудника") + employee_full_name: str = Field(..., min_length=1, description="ФИО сотрудника") + contact_phone_number: str = Field(..., description="Телефон контакта") + called_phone_number: str = Field(..., description="Набранный телефон") + communication_group_name: str = Field(..., description="Группа коммуникации") + start_time: datetime = Field(..., description="Время начала звонка") + finish_time: datetime = Field(..., description="Время окончания звонка") + talk_time_duration: int = Field(..., ge=0, description="Длительность разговора (сек)") + full_record_file_link: HttpUrl = Field(..., description="Ссылка на запись") + campaign_name: str = Field(..., description="Название кампании") + + @field_validator('finish_time') + @classmethod + def validate_finish_time(cls, v: datetime, info) -> datetime: + """Проверяем, что finish_time >= start_time""" + if 'start_time' in info.data and v < info.data['start_time']: + raise ValueError('finish_time must be greater than or equal to start_time') + return v + router = APIRouter() @router.post("/webhook", status_code=204) -async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None: - """Webhook для получения событий звонков от UIS""" +async def create_call_event( + callEvent: UisCallEvent, + db: AsyncSession = Depends(get_db) +) -> None: + """ + Webhook для получения событий звонков от UIS + + **Обрабатывает события:** + - Сохраняет событие звонка в базу данных + - Проверяет уникальность call_session_id + - Логирует успешную обработку + + **Возможные ошибки:** + - 409 Conflict: Событие с таким call_session_id уже существует + - 422 Unprocessable Entity: Ошибка валидации данных + - 500 Internal Server Error: Ошибка сервера/БД + """ + logger.info( + f"Received webhook event", + extra={ + "event_type": callEvent.eventType, + "call_session_id": callEvent.call_session_id, + "employee_id": callEvent.employee_id + } + ) # Преобразуем UisCallEvent в данные для CallEvent (БД) event_data = { @@ -53,5 +93,5 @@ async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends( "clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение } - # Сохраняем в БД + # Сохраняем в БД (исключения обрабатываются в exception handlers) await crud.create_call_event(db, event_data) \ No newline at end of file diff --git a/services/ingest-service/app/core/__init__.py b/services/ingest-service/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/ingest-service/app/core/exceptions.py b/services/ingest-service/app/core/exceptions.py new file mode 100644 index 0000000..d120e52 --- /dev/null +++ b/services/ingest-service/app/core/exceptions.py @@ -0,0 +1,29 @@ +"""Кастомные исключения для приложения""" + + +class AppException(Exception): + """Базовое исключение приложения""" + def __init__(self, message: str, details: dict = None): + self.message = message + self.details = details or {} + super().__init__(self.message) + + +class CallEventAlreadyExistsError(AppException): + """Событие звонка с таким call_session_id уже существует""" + def __init__(self, call_session_id: int): + super().__init__( + message=f"Call event with session_id {call_session_id} already exists", + details={"call_session_id": call_session_id} + ) + + +class DatabaseError(AppException): + """Ошибка работы с базой данных""" + pass + + +class ValidationError(AppException): + """Ошибка валидации данных""" + pass + diff --git a/services/ingest-service/app/crud.py b/services/ingest-service/app/crud.py index 0855e91..da95ddb 100644 --- a/services/ingest-service/app/crud.py +++ b/services/ingest-service/app/crud.py @@ -1,11 +1,61 @@ from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.exc import IntegrityError +from asyncpg.exceptions import UniqueViolationError +import logging + from app.models.call_event import CallEvent +from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError + +logger = logging.getLogger(__name__) async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent: - """Создать новое событие звонка в БД""" - call_event = CallEvent(**event_data) - db.add(call_event) - await db.commit() - await db.refresh(call_event) - return call_event + """Создать новое событие звонка в БД + + Args: + db: Async сессия базы данных + event_data: Данные события звонка + + Returns: + CallEvent: Созданное событие + + Raises: + CallEventAlreadyExistsError: Если событие с таким call_session_id уже существует + DatabaseError: При других ошибках БД + """ + try: + call_event = CallEvent(**event_data) + db.add(call_event) + await db.commit() + await db.refresh(call_event) + + logger.info( + f"Call event created successfully", + extra={ + "call_session_id": call_event.call_session_id, + "employee_id": call_event.employee_id + } + ) + + return call_event + + except IntegrityError as e: + await db.rollback() + + # Проверяем, является ли это ошибкой уникальности call_session_id + if isinstance(e.orig, UniqueViolationError): + call_session_id = event_data.get("call_session_id") + logger.warning( + f"Duplicate call event detected", + extra={"call_session_id": call_session_id} + ) + raise CallEventAlreadyExistsError(call_session_id) + + # Другие ошибки целостности + logger.error(f"Database integrity error: {str(e)}") + raise DatabaseError(f"Failed to create call event: {str(e)}") + + except Exception as e: + await db.rollback() + logger.error(f"Unexpected error creating call event: {str(e)}", exc_info=True) + raise DatabaseError(f"Unexpected database error: {str(e)}") diff --git a/services/ingest-service/app/infrastructure/database.py b/services/ingest-service/app/infrastructure/database.py index c438538..98ef2fd 100644 --- a/services/ingest-service/app/infrastructure/database.py +++ b/services/ingest-service/app/infrastructure/database.py @@ -2,13 +2,28 @@ import os from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.declarative import declarative_base from dotenv import load_dotenv + load_dotenv() DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db") +DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "yes") # Единый async движок для всего приложения -engine = create_async_engine(DATABASE_URL, echo=True) -AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) +# echo=True включается только в режиме DEBUG +engine = create_async_engine( + DATABASE_URL, + echo=DEBUG, + pool_size=10, + max_overflow=20, + pool_pre_ping=True, + pool_recycle=3600, +) + +AsyncSessionLocal = async_sessionmaker( + engine, + class_=AsyncSession, + expire_on_commit=False +) Base = declarative_base() diff --git a/services/ingest-service/app/main.py b/services/ingest-service/app/main.py index 29f80cb..29592f9 100644 --- a/services/ingest-service/app/main.py +++ b/services/ingest-service/app/main.py @@ -1,6 +1,17 @@ from contextlib import asynccontextmanager -from fastapi import FastAPI +from fastapi import FastAPI, Request, status +from fastapi.responses import JSONResponse +import logging + from app.api.uis import router as uis_router +from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError + +# Настройка логирования +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) app = FastAPI( title="Ingest Service API", @@ -8,4 +19,68 @@ app = FastAPI( version="1.0.0", ) + +# Exception handlers +@app.exception_handler(CallEventAlreadyExistsError) +async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError): + """Обработчик для дубликатов событий звонков""" + logger.warning(f"Duplicate call event: {exc.message}", extra=exc.details) + return JSONResponse( + status_code=status.HTTP_409_CONFLICT, + content={ + "error": "duplicate_call_event", + "message": exc.message, + "details": exc.details + } + ) + + +@app.exception_handler(DatabaseError) +async def database_error_handler(request: Request, exc: DatabaseError): + """Обработчик для ошибок БД""" + logger.error(f"Database error: {exc.message}", extra=exc.details) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "error": "database_error", + "message": "Internal database error occurred", + "details": exc.details + } + ) + + +@app.exception_handler(ValidationError) +async def validation_error_handler(request: Request, exc: ValidationError): + """Обработчик для ошибок валидации""" + logger.warning(f"Validation error: {exc.message}", extra=exc.details) + return JSONResponse( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + content={ + "error": "validation_error", + "message": exc.message, + "details": exc.details + } + ) + + +@app.exception_handler(Exception) +async def general_exception_handler(request: Request, exc: Exception): + """Общий обработчик для неперехваченных исключений""" + logger.error(f"Unhandled exception: {str(exc)}", exc_info=True) + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={ + "error": "internal_server_error", + "message": "An unexpected error occurred" + } + ) + + +# Health check endpoint +@app.get("/health", tags=["Health"]) +async def health_check(): + """Проверка состояния сервиса""" + return {"status": "healthy", "service": "ingest-service"} + + app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"]) \ No newline at end of file diff --git a/services/ingest-service/docker-compose.yaml b/services/ingest-service/docker-compose.yaml index 16711bf..3432814 100644 --- a/services/ingest-service/docker-compose.yaml +++ b/services/ingest-service/docker-compose.yaml @@ -11,6 +11,7 @@ services: DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db APP_HOST: 0.0.0.0 APP_PORT: 8000 + DEBUG: "False" ports: - "8000:8000" networks: