Files
tg_resume_db/extract/llm.py
2026-03-11 15:27:10 +03:00

586 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import hashlib
import json
import os
import re
import sqlite3
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
try:
import httpx # type: ignore
except Exception: # pragma: no cover
httpx = None # type: ignore
def resolve_llm_runtime() -> Dict[str, str]:
"""
Resolve OpenAI-compatible runtime config.
Supports both generic vars and Mistral aliases:
- generic: LLM_BASE_URL / LLM_MODEL / LLM_API_KEY
- mistral: MISTRAL_API_KEY / MISTRAL_MODEL / MISTRAL_BASE_URL
"""
provider = (os.environ.get("LLM_PROVIDER") or "").strip().lower()
base_url = (os.environ.get("LLM_BASE_URL") or "").strip()
model = (os.environ.get("LLM_MODEL") or "").strip()
api_key = (os.environ.get("LLM_API_KEY") or "").strip()
mistral_key = (os.environ.get("MISTRAL_API_KEY") or "").strip()
mistral_model = (os.environ.get("MISTRAL_MODEL") or "").strip()
mistral_base = (os.environ.get("MISTRAL_BASE_URL") or "https://api.mistral.ai/v1").strip()
if not api_key and mistral_key:
api_key = mistral_key
if not model and mistral_model:
model = mistral_model
if not base_url and (mistral_key or mistral_model or provider == "mistral" or os.environ.get("MISTRAL_BASE_URL")):
base_url = mistral_base
if base_url:
base_url = base_url.rstrip("/")
if not provider:
if "mistral.ai" in base_url or (model and model.lower().startswith("mistral")):
provider = "mistral"
else:
provider = "generic"
return {
"provider": provider,
"base_url": base_url,
"model": model,
"api_key": api_key,
}
# ------------- Public API -------------
def llm_parse_enabled() -> bool:
"""
Enabled only if httpx is available and both base_url/model are resolved.
Opt-out via LLM_PARSE_ENABLED=0.
"""
if httpx is None:
return False
if os.environ.get("LLM_PARSE_ENABLED", "1").lower() in ("0", "false", "no"):
return False
runtime = resolve_llm_runtime()
return bool(runtime["base_url"]) and bool(runtime["model"])
_PROMPT_VERSION = "v3_sections_doc_type"
_REVIEW_PROMPT_VERSION = "v1_review_merge"
@dataclass
class LLMExtraction:
roles: List[str]
skills: List[str]
primary_languages: List[str]
seniority: Optional[str]
backend_focus: Optional[bool]
experience_years_total: Optional[float]
experience_years_engineering: Optional[float]
english_level: Optional[str]
location: Optional[str]
remote_ok: Optional[bool]
salary_min_usd: Optional[int]
salary_max_usd: Optional[int]
salary_min_rub: Optional[int]
salary_max_rub: Optional[int]
highlights: List[str]
keywords: List[str]
@staticmethod
def from_obj(obj: Dict[str, Any]) -> "LLMExtraction":
def _as_list(v: Any) -> List[str]:
if v is None:
return []
if isinstance(v, list):
return [str(x).strip() for x in v if str(x).strip()]
s = str(v).strip()
return [s] if s else []
def _as_float(v: Any) -> Optional[float]:
try:
return float(v)
except Exception:
return None
def _as_int(v: Any) -> Optional[int]:
try:
return int(float(v))
except Exception:
return None
def _as_bool(v: Any) -> Optional[bool]:
if isinstance(v, bool):
return v
if v is None:
return None
s = str(v).strip().lower()
if s in ("true", "1", "yes", "y"):
return True
if s in ("false", "0", "no", "n"):
return False
return None
return LLMExtraction(
roles=_as_list(obj.get("roles")),
skills=_as_list(obj.get("skills")),
primary_languages=_as_list(obj.get("primary_languages")),
seniority=(str(obj.get("seniority")).strip().lower() or None) if obj.get("seniority") else None,
backend_focus=_as_bool(obj.get("backend_focus")),
experience_years_total=_as_float(obj.get("experience_years_total")),
experience_years_engineering=_as_float(obj.get("experience_years_engineering")),
english_level=(str(obj.get("english_level")).strip().upper() or None) if obj.get("english_level") else None,
location=(str(obj.get("location")).strip() or None) if obj.get("location") else None,
remote_ok=_as_bool(obj.get("remote_ok")),
salary_min_usd=_as_int(obj.get("salary_min_usd")),
salary_max_usd=_as_int(obj.get("salary_max_usd")),
salary_min_rub=_as_int(obj.get("salary_min_rub")),
salary_max_rub=_as_int(obj.get("salary_max_rub")),
highlights=_as_list(obj.get("highlights")),
keywords=_as_list(obj.get("keywords")),
)
def llm_extract_profile(
clean_text: str,
*,
con: Optional[sqlite3.Connection] = None,
doc_type: Optional[str] = None,
sections: Optional[Dict[str, str]] = None,
) -> Tuple[Optional[LLMExtraction], Dict[str, Any]]:
"""
Returns (LLMExtraction | None, debug_info).
- Uses cache on disk/sqlite to keep throughput high.
- Silently degrades to None on any failure.
"""
runtime = resolve_llm_runtime()
dbg: Dict[str, Any] = {
"enabled": llm_parse_enabled(),
"provider": runtime.get("provider"),
"model": runtime.get("model"),
"from_cache": False,
"cache_backend": None,
"error": None,
"prompt_version": _PROMPT_VERSION,
}
if not llm_parse_enabled():
return None, dbg
text_hash = hashlib.sha1(clean_text.encode("utf-8", errors="ignore")).hexdigest()
cache_key = f"extract:{text_hash}:{runtime['model']}:{_PROMPT_VERSION}"
payload = _build_payload(
clean_text,
doc_type=doc_type,
sections=sections,
prompt_version=_PROMPT_VERSION,
temperature=float(os.environ.get("LLM_PARSE_TEMPERATURE", 0.1)),
max_tokens=int(os.environ.get("LLM_PARSE_MAX_TOKENS", 700)),
system_prompt="You output ONLY JSON for structured resume extraction.",
prompt_template=_PROMPT_TEMPLATE,
)
data = _cached_llm_json_call(
con=con,
cache_key=cache_key,
model=runtime["model"],
payload=payload,
dbg=dbg,
)
if data is None:
return None, dbg
return LLMExtraction.from_obj(data), dbg
def llm_review_profile(
clean_text: str,
*,
draft: Dict[str, Any],
con: Optional[sqlite3.Connection] = None,
doc_type: Optional[str] = None,
sections: Optional[Dict[str, str]] = None,
) -> Tuple[Optional[LLMExtraction], Dict[str, Any]]:
"""
Second-pass validator:
- Takes already parsed JSON (draft)
- Re-checks every field against resume text
- Returns corrected extraction for safe merge in pipeline
"""
runtime = resolve_llm_runtime()
dbg: Dict[str, Any] = {
"enabled": llm_parse_enabled(),
"provider": runtime.get("provider"),
"model": runtime.get("model"),
"from_cache": False,
"cache_backend": None,
"error": None,
"prompt_version": _REVIEW_PROMPT_VERSION,
"quality_score": None,
"changed_fields": [],
"issues_found": [],
}
if not llm_parse_enabled():
return None, dbg
clean_draft = _sanitize_review_draft(draft)
draft_blob = json.dumps(clean_draft, ensure_ascii=False, sort_keys=True)
text_hash = hashlib.sha1(clean_text.encode("utf-8", errors="ignore")).hexdigest()
draft_hash = hashlib.sha1(draft_blob.encode("utf-8", errors="ignore")).hexdigest()
cache_key = f"review:{text_hash}:{draft_hash}:{runtime['model']}:{_REVIEW_PROMPT_VERSION}"
payload = _build_payload(
clean_text,
doc_type=doc_type,
sections=sections,
prompt_version=_REVIEW_PROMPT_VERSION,
temperature=float(os.environ.get("LLM_REVIEW_TEMPERATURE", 0.0)),
max_tokens=int(os.environ.get("LLM_REVIEW_MAX_TOKENS", 850)),
system_prompt="You output ONLY JSON for resume parsing quality review.",
prompt_template=_REVIEW_PROMPT_TEMPLATE,
extra_vars={"draft_json": draft_blob},
)
data = _cached_llm_json_call(
con=con,
cache_key=cache_key,
model=runtime["model"],
payload=payload,
dbg=dbg,
)
if data is None:
return None, dbg
corrected_obj: Dict[str, Any]
if isinstance(data.get("corrected"), dict):
corrected_obj = data["corrected"]
else:
corrected_obj = data
dbg["quality_score"] = _as_float(data.get("quality_score"))
dbg["changed_fields"] = _as_str_list(data.get("changed_fields"))
dbg["issues_found"] = _as_str_list(data.get("issues_found"))
return LLMExtraction.from_obj(corrected_obj), dbg
# ------------- Internal helpers -------------
_PROMPT_TEMPLATE = """
Ты - ассистент, который структурирует резюме разработчиков. Отвечай ТОЛЬКО JSON.
Используй только факты из текста, ничего не придумывай. Если данных нет - ставь null или пустой список.
Схема:
{{
"roles": ["backend","devops","frontend","qa","data engineer","android","ios"],
"skills": ["python","go","k8s","postgres","react", "..."],
"primary_languages": ["python","go","java","c++", "..."],
"seniority": "intern|junior|middle|senior|lead|principal|null",
"backend_focus": true|false|null,
"experience_years_total": number|null,
"experience_years_engineering": number|null,
"english_level": "A1|A2|B1|B2|C1|C2|null",
"location": "city, country|null",
"remote_ok": true|false|null,
"salary_min_usd": int|null,
"salary_max_usd": int|null,
"salary_min_rub": int|null,
"salary_max_rub": int|null,
"highlights": ["кратко достижения (1-2 предложения)"],
"keywords": ["уникальные ключевые слова, продукты или домены"]
}}
Не включай контактные данные в skills/keywords.
Detected doc_type: {doc_type}
Sections (if present):
{sections_block}
Full text snippet (use only if needed):
```TEXT
{resume_text}
```
"""
_REVIEW_PROMPT_TEMPLATE = """
Ты валидатор качества парсинга резюме разработчиков. Отвечай ТОЛЬКО JSON.
У тебя есть черновой JSON после эвристик/первичного парсинга. Нужно перепроверить каждое поле по тексту резюме.
Исправляй только то, что прямо подтверждается текстом. Нельзя выдумывать.
Верни JSON строго такой формы:
{{
"corrected": {{
"roles": ["..."],
"skills": ["..."],
"primary_languages": ["..."],
"seniority": "intern|junior|middle|senior|lead|principal|null",
"backend_focus": true|false|null,
"experience_years_total": number|null,
"experience_years_engineering": number|null,
"english_level": "A1|A2|B1|B2|C1|C2|null",
"location": "city, country|null",
"remote_ok": true|false|null,
"salary_min_usd": int|null,
"salary_max_usd": int|null,
"salary_min_rub": int|null,
"salary_max_rub": int|null,
"highlights": ["..."],
"keywords": ["..."]
}},
"changed_fields": ["field_name", "..."],
"issues_found": ["кратко что было неверно/сомнительно", "..."],
"quality_score": 0.0
}}
Черновик JSON:
```DRAFT
{draft_json}
```
Detected doc_type: {doc_type}
Sections (if present):
{sections_block}
Full text snippet (use only if needed):
```TEXT
{resume_text}
```
"""
def _trim_text(text: str, max_len: int = 9000) -> str:
"""
Keep head and tail to preserve summary + recent projects.
"""
if len(text) <= max_len:
return text
head = text[: max_len // 2]
tail = text[-max_len // 2 :]
return head + "\n...\n" + tail
def _build_payload(
clean_text: str,
*,
doc_type: Optional[str],
sections: Optional[Dict[str, str]],
prompt_version: str,
temperature: float,
max_tokens: int,
system_prompt: str,
prompt_template: str,
extra_vars: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
runtime = resolve_llm_runtime()
base_url = runtime["base_url"]
model = runtime["model"]
sections_block = _build_sections_block(sections)
tpl_vars = {
"resume_text": _trim_text(clean_text),
"doc_type": (doc_type or "unknown"),
"sections_block": sections_block or "(no sections detected)",
}
if extra_vars:
tpl_vars.update(extra_vars)
prompt = prompt_template.format(**tpl_vars)
return {
"base_url": base_url,
"model": model,
"prompt_version": prompt_version,
"payload": {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
],
"temperature": temperature,
"max_tokens": max_tokens,
},
"headers": _build_headers(runtime),
"timeout": float(os.environ.get("LLM_PARSE_TIMEOUT", 18.0)),
}
def _build_headers(runtime: Dict[str, str]) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
api_key = runtime.get("api_key", "")
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
return headers
def _cached_llm_json_call(
*,
con: Optional[sqlite3.Connection],
cache_key: str,
model: str,
payload: Dict[str, Any],
dbg: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
data = _cache_get_sqlite(con, cache_key)
if data:
dbg["from_cache"] = True
dbg["cache_backend"] = "sqlite"
return data
cache_dir = Path(os.environ.get("LLM_PARSE_CACHE", ".cache/llm_parse")).resolve()
cache_ok = True
try:
cache_dir.mkdir(parents=True, exist_ok=True)
except Exception:
cache_ok = False
safe_name = cache_key.replace(":", "_")
cache_path = (cache_dir / f"{safe_name}.json") if cache_ok else None
if cache_path and cache_path.exists():
try:
data = json.loads(cache_path.read_text(encoding="utf-8"))
dbg["from_cache"] = True
dbg["cache_backend"] = "disk"
return data
except Exception:
pass
try:
data = _llm_call_json(payload)
if con:
_cache_put_sqlite(con, cache_key, model, data)
if cache_path:
cache_path.write_text(json.dumps(data, ensure_ascii=False), encoding="utf-8")
return data
except Exception as e: # pragma: no cover - network/LLM failures
dbg["error"] = repr(e)
return None
def _llm_call_json(task: Dict[str, Any]) -> Dict[str, Any]:
if httpx is None:
raise RuntimeError("httpx is not installed")
base_url: str = task["base_url"]
payload: Dict[str, Any] = task["payload"]
timeout = float(task.get("timeout", 18.0))
with httpx.Client(timeout=timeout) as client:
r = client.post(f"{base_url}/chat/completions", headers=task["headers"], json=payload)
r.raise_for_status()
data = r.json()
content = data["choices"][0]["message"]["content"]
if isinstance(content, list):
parts = []
for block in content:
if isinstance(block, dict):
parts.append(str(block.get("text") or ""))
else:
parts.append(str(block))
content = "\n".join(parts)
content = str(content)
m = re.search(r"\{.*\}", content, flags=re.S)
if not m:
raise ValueError("LLM did not return JSON")
return json.loads(m.group(0))
def _build_sections_block(sections: Optional[Dict[str, str]]) -> str:
if not sections:
return ""
parts: List[str] = []
order = [
("about", "ABOUT"),
("skills", "SKILLS"),
("experience", "EXPERIENCE"),
("education", "EDUCATION"),
("contacts", "CONTACTS"),
]
for key, label in order:
text = sections.get(key)
if not text:
continue
snippet = _trim_text(text, max_len=1800)
parts.append(f"[{label}]\n{snippet}")
return "\n\n".join(parts)
def _sanitize_review_draft(draft: Dict[str, Any]) -> Dict[str, Any]:
if not isinstance(draft, dict):
draft = {}
allowed = {
"roles",
"skills",
"primary_languages",
"seniority",
"backend_focus",
"experience_years_total",
"experience_years_engineering",
"english_level",
"location",
"remote_ok",
"salary_min_usd",
"salary_max_usd",
"salary_min_rub",
"salary_max_rub",
"highlights",
"keywords",
}
cleaned = {k: v for k, v in draft.items() if k in allowed}
return asdict(LLMExtraction.from_obj(cleaned))
def _as_float(v: Any) -> Optional[float]:
try:
x = float(v)
except Exception:
return None
if x < 0:
return None
if x > 1.0:
return 1.0
return x
def _as_str_list(v: Any) -> List[str]:
if v is None:
return []
if isinstance(v, list):
return [str(x).strip() for x in v if str(x).strip()]
s = str(v).strip()
return [s] if s else []
def _cache_get_sqlite(con: Optional[sqlite3.Connection], cache_key: str) -> Optional[Dict[str, Any]]:
if con is None:
return None
try:
row = con.execute("SELECT result_json FROM llm_cache WHERE cache_key=?", (cache_key,)).fetchone()
if row and row["result_json"]:
return json.loads(row["result_json"])
except Exception:
return None
return None
def _cache_put_sqlite(
con: Optional[sqlite3.Connection],
cache_key: str,
model: str,
data: Dict[str, Any],
) -> None:
if con is None:
return
try:
con.execute(
"INSERT OR REPLACE INTO llm_cache(cache_key, model, result_json) VALUES (?,?,?)",
(cache_key, model, json.dumps(data, ensure_ascii=False)),
)
except Exception:
return