feature: added exception handling

This commit is contained in:
2025-11-20 23:38:26 +07:00
parent 5d5b6140d4
commit 3d4430134b
8 changed files with 237 additions and 26 deletions

View File

@@ -4,4 +4,5 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db
# Application # Application
APP_HOST=0.0.0.0 APP_HOST=0.0.0.0
APP_PORT=8000 APP_PORT=8000
DEBUG=False

View File

@@ -1,39 +1,79 @@
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from typing import List from typing import List
import logging
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl, Field, field_validator
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.infrastructure.database import get_db from app.infrastructure.database import get_db
from app import crud from app import crud
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
logger = logging.getLogger(__name__)
class CallDirection(str, Enum): class CallDirection(str, Enum):
"""Направление звонка"""
in_ = "in" in_ = "in"
out = "out" out = "out"
class UisCallEvent(BaseModel): class UisCallEvent(BaseModel):
eventType: str """Схема события звонка от UIS"""
call_session_id: int eventType: str = Field(..., description="Тип события")
direction: CallDirection call_session_id: int = Field(..., gt=0, description="Уникальный ID сессии звонка")
employee_id: int direction: CallDirection = Field(..., description="Направление звонка")
employee_full_name: str employee_id: int = Field(..., gt=0, description="ID сотрудника")
contact_phone_number: str employee_full_name: str = Field(..., min_length=1, description="ФИО сотрудника")
called_phone_number: str contact_phone_number: str = Field(..., description="Телефон контакта")
communication_group_name: str called_phone_number: str = Field(..., description="Набранный телефон")
start_time: datetime communication_group_name: str = Field(..., description="Группа коммуникации")
finish_time: datetime start_time: datetime = Field(..., description="Время начала звонка")
talk_time_duration: int finish_time: datetime = Field(..., description="Время окончания звонка")
full_record_file_link: HttpUrl talk_time_duration: int = Field(..., ge=0, description="Длительность разговора (сек)")
campaign_name: str full_record_file_link: HttpUrl = Field(..., description="Ссылка на запись")
campaign_name: str = Field(..., description="Название кампании")
@field_validator('finish_time')
@classmethod
def validate_finish_time(cls, v: datetime, info) -> datetime:
"""Проверяем, что finish_time >= start_time"""
if 'start_time' in info.data and v < info.data['start_time']:
raise ValueError('finish_time must be greater than or equal to start_time')
return v
router = APIRouter() router = APIRouter()
@router.post("/webhook", status_code=204) @router.post("/webhook", status_code=204)
async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(get_db)) -> None: async def create_call_event(
"""Webhook для получения событий звонков от UIS""" callEvent: UisCallEvent,
db: AsyncSession = Depends(get_db)
) -> None:
"""
Webhook для получения событий звонков от UIS
**Обрабатывает события:**
- Сохраняет событие звонка в базу данных
- Проверяет уникальность call_session_id
- Логирует успешную обработку
**Возможные ошибки:**
- 409 Conflict: Событие с таким call_session_id уже существует
- 422 Unprocessable Entity: Ошибка валидации данных
- 500 Internal Server Error: Ошибка сервера/БД
"""
logger.info(
f"Received webhook event",
extra={
"event_type": callEvent.eventType,
"call_session_id": callEvent.call_session_id,
"employee_id": callEvent.employee_id
}
)
# Преобразуем UisCallEvent в данные для CallEvent (БД) # Преобразуем UisCallEvent в данные для CallEvent (БД)
event_data = { event_data = {
@@ -53,5 +93,5 @@ async def create_call_event(callEvent: UisCallEvent, db: AsyncSession = Depends(
"clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение "clean_talk_time_duration": callEvent.talk_time_duration, # Упрощение
} }
# Сохраняем в БД # Сохраняем в БД (исключения обрабатываются в exception handlers)
await crud.create_call_event(db, event_data) await crud.create_call_event(db, event_data)

View File

@@ -0,0 +1,29 @@
"""Кастомные исключения для приложения"""
class AppException(Exception):
"""Базовое исключение приложения"""
def __init__(self, message: str, details: dict = None):
self.message = message
self.details = details or {}
super().__init__(self.message)
class CallEventAlreadyExistsError(AppException):
"""Событие звонка с таким call_session_id уже существует"""
def __init__(self, call_session_id: int):
super().__init__(
message=f"Call event with session_id {call_session_id} already exists",
details={"call_session_id": call_session_id}
)
class DatabaseError(AppException):
"""Ошибка работы с базой данных"""
pass
class ValidationError(AppException):
"""Ошибка валидации данных"""
pass

View File

@@ -1,11 +1,61 @@
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
from asyncpg.exceptions import UniqueViolationError
import logging
from app.models.call_event import CallEvent from app.models.call_event import CallEvent
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError
logger = logging.getLogger(__name__)
async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent: async def create_call_event(db: AsyncSession, event_data: dict) -> CallEvent:
"""Создать новое событие звонка в БД""" """Создать новое событие звонка в БД
Args:
db: Async сессия базы данных
event_data: Данные события звонка
Returns:
CallEvent: Созданное событие
Raises:
CallEventAlreadyExistsError: Если событие с таким call_session_id уже существует
DatabaseError: При других ошибках БД
"""
try:
call_event = CallEvent(**event_data) call_event = CallEvent(**event_data)
db.add(call_event) db.add(call_event)
await db.commit() await db.commit()
await db.refresh(call_event) await db.refresh(call_event)
logger.info(
f"Call event created successfully",
extra={
"call_session_id": call_event.call_session_id,
"employee_id": call_event.employee_id
}
)
return call_event return call_event
except IntegrityError as e:
await db.rollback()
# Проверяем, является ли это ошибкой уникальности call_session_id
if isinstance(e.orig, UniqueViolationError):
call_session_id = event_data.get("call_session_id")
logger.warning(
f"Duplicate call event detected",
extra={"call_session_id": call_session_id}
)
raise CallEventAlreadyExistsError(call_session_id)
# Другие ошибки целостности
logger.error(f"Database integrity error: {str(e)}")
raise DatabaseError(f"Failed to create call event: {str(e)}")
except Exception as e:
await db.rollback()
logger.error(f"Unexpected error creating call event: {str(e)}", exc_info=True)
raise DatabaseError(f"Unexpected database error: {str(e)}")

View File

@@ -2,13 +2,28 @@ import os
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://postgres:postgres@localhost:5432/ingest_db")
DEBUG = os.getenv("DEBUG", "False").lower() in ("true", "1", "yes")
# Единый async движок для всего приложения # Единый async движок для всего приложения
engine = create_async_engine(DATABASE_URL, echo=True) # echo=True включается только в режиме DEBUG
AsyncSessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) engine = create_async_engine(
DATABASE_URL,
echo=DEBUG,
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
Base = declarative_base() Base = declarative_base()

View File

@@ -1,6 +1,17 @@
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from fastapi import FastAPI from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
import logging
from app.api.uis import router as uis_router from app.api.uis import router as uis_router
from app.core.exceptions import CallEventAlreadyExistsError, DatabaseError, ValidationError
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI( app = FastAPI(
title="Ingest Service API", title="Ingest Service API",
@@ -8,4 +19,68 @@ app = FastAPI(
version="1.0.0", version="1.0.0",
) )
# Exception handlers
@app.exception_handler(CallEventAlreadyExistsError)
async def call_event_exists_handler(request: Request, exc: CallEventAlreadyExistsError):
"""Обработчик для дубликатов событий звонков"""
logger.warning(f"Duplicate call event: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_409_CONFLICT,
content={
"error": "duplicate_call_event",
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(DatabaseError)
async def database_error_handler(request: Request, exc: DatabaseError):
"""Обработчик для ошибок БД"""
logger.error(f"Database error: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "database_error",
"message": "Internal database error occurred",
"details": exc.details
}
)
@app.exception_handler(ValidationError)
async def validation_error_handler(request: Request, exc: ValidationError):
"""Обработчик для ошибок валидации"""
logger.warning(f"Validation error: {exc.message}", extra=exc.details)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={
"error": "validation_error",
"message": exc.message,
"details": exc.details
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""Общий обработчик для неперехваченных исключений"""
logger.error(f"Unhandled exception: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "internal_server_error",
"message": "An unexpected error occurred"
}
)
# Health check endpoint
@app.get("/health", tags=["Health"])
async def health_check():
"""Проверка состояния сервиса"""
return {"status": "healthy", "service": "ingest-service"}
app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"]) app.include_router(uis_router, prefix="/v1/uis", tags=["UIS Webhooks"])

View File

@@ -11,6 +11,7 @@ services:
DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db DATABASE_URL: postgresql+asyncpg://postgres:postgres@ingest-postgres:5432/ingest_db
APP_HOST: 0.0.0.0 APP_HOST: 0.0.0.0
APP_PORT: 8000 APP_PORT: 8000
DEBUG: "False"
ports: ports:
- "8000:8000" - "8000:8000"
networks: networks: