Files
tg_resume_db/pipeline.py
2026-03-11 15:27:10 +03:00

1991 lines
69 KiB
Python

from __future__ import annotations
import json
import os
import re
import shutil
import sqlite3
import subprocess
import uuid
from dataclasses import asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from tg_resume_db.util import Logger, utc_iso
from tg_resume_db.extract.text_extract import extract_text as extract_text_generic
from tg_resume_db.extract.clean import normalize_text, to_fts_text
from tg_resume_db.extract.pdf_extract import extract_pdf_best
from tg_resume_db.extract.llm import (
LLMExtraction,
llm_extract_profile,
llm_parse_enabled,
llm_review_profile,
)
from tg_resume_db.extract.doc_type import detect_doc_type
from tg_resume_db.extract.sections import split_sections, sections_present
from tg_resume_db.extract.experience_timeline import extract_positions, positions_to_dicts
from tg_resume_db.extract.parse import (
extract_contacts as extract_contacts_raw,
extract_name_guess,
extract_remote,
extract_english,
extract_salary,
extract_location_best_effort,
extract_experience_years, # Updated function
norm_pipe,
safe_json,
)
from tg_resume_db.extract.templates import generic as tpl_generic
from tg_resume_db.extract.templates import hh_ru as tpl_hh
from tg_resume_db.extract.templates import linkedin as tpl_linkedin
from tg_resume_db.extract.templates import one_page_en as tpl_one_page_en
from tg_resume_db.extract.templates import one_page_ru as tpl_one_page_ru
from tg_resume_db.extract.templates import pptx_export as tpl_pptx
from tg_resume_db.normalize import (
normalize_skills,
normalize_roles,
split_skills_primary_secondary,
normalize_location,
)
from tg_resume_db.dedup.simhash import (
sha256_file,
sha1_str,
simhash64,
simhash_bands,
hamming64,
)
from tg_resume_db.importers.telegram_json import find_result_json, iter_artifacts as iter_json_artifacts
from tg_resume_db.importers.telegram_html import find_messages_html, iter_artifacts as iter_html_artifacts
from tg_resume_db.importers.file_scan import iter_files as iter_file_scan
_PARSE_VERSION = "v3_llm_review"
# -----------------------------
# helpers: make everything text
# -----------------------------
def coerce_text(x: Any) -> str:
"""Turn Telegram-export weird structures (dict/list/bytes) into plain text."""
if x is None:
return ""
if isinstance(x, str):
return x
if isinstance(x, bytes):
for enc in ("utf-8", "utf-16", "cp1251", "latin-1"):
try:
return x.decode(enc, errors="ignore")
except Exception:
pass
return x.decode("utf-8", errors="ignore")
if isinstance(x, list):
parts: List[str] = []
for item in x:
if isinstance(item, dict):
parts.append(coerce_text(item.get("text") or item.get("href") or ""))
else:
parts.append(coerce_text(item))
return "".join(parts)
if isinstance(x, dict):
if "text" in x:
return coerce_text(x["text"])
if "content" in x:
return coerce_text(x["content"])
return json.dumps(x, ensure_ascii=False)
return str(x)
# -----------------------------
# PDF extraction: prefer pdftotext
# -----------------------------
def _which_pdftotext() -> Optional[str]:
if os.environ.get("PDFTOTEXT_ENABLE", "0").lower() not in ("1", "true", "yes"):
return None
exe = shutil.which("pdftotext") or shutil.which("pdftotext.exe")
return exe
def extract_text_from_pdf_pdftotext(fp: Path, timeout_sec: int = 25) -> str:
exe = _which_pdftotext()
if not exe:
return ""
cmd = [exe, "-layout", "-nopgbrk", str(fp), "-"]
try:
p = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout_sec,
check=False,
text=True,
encoding="utf-8",
errors="ignore",
)
return (p.stdout or "").strip()
except subprocess.TimeoutExpired:
return ""
except Exception:
return ""
def extract_text_resilient(fp: Path, log: Optional[Logger] = None, timeout_sec: int = 25) -> str:
ext = fp.suffix.lower()
if ext == ".pdf":
out = extract_text_from_pdf_pdftotext(fp, timeout_sec=timeout_sec)
if out:
return out
try:
return extract_text_generic(fp) or ""
except Exception as e:
if log:
log.warn("[extract] pdf failed - skipped", {"file": str(fp), "err": repr(e)})
return ""
try:
return extract_text_generic(fp) or ""
except Exception as e:
if log:
log.warn("[extract] file failed - skipped", {"file": str(fp), "err": repr(e)})
return ""
# -----------------------------
# contacts normalization + phone/tg cleanup
# -----------------------------
_EMAIL_RE = re.compile(r"\b[a-zA-Z0-9._%+\-]{1,64}@[a-zA-Z0-9.\-]{1,253}\.[a-zA-Z]{2,}\b")
_EMAIL_SPLIT_RE = re.compile(
r"(?<![@\w])(?P<prefix>[a-z0-9][a-z0-9._%+\-]{1,40})\s+"
r"(?P<tail>[a-z0-9][a-z0-9._%+\-]{0,40}@[a-z0-9.\-]+\.[a-z]{2,})",
re.I,
)
_TG_AT_RE = re.compile(r"(?<![\w@])@([A-Za-z0-9_]{5,32})(?!\w)")
_TG_LINK_RE = re.compile(r"(?:https?://)?(?:t\.me|telegram\.me)/([A-Za-z0-9_]{5,32})(?!\w)")
_PHONE_CHUNK_RE = re.compile(r"(?<!\d)(\+?\d[\d()\-\s.]{7,}\d)(?!\d)")
_TG_STOP = {
"gmail", "email", "mail", "phone", "telegram", "linkedin", "github", "resume", "cv",
"backend", "frontend", "fullstack", "ios", "android", "qa",
"senior", "middle", "junior", "lead", "teamlead", "developer",
"community", "release"
}
_EMAIL_PREFIX_STOP = {
"email", "mail", "contact", "contacts", "phone", "tel", "telegram", "linkedin", "github",
}
def _norm_email(s: str) -> Optional[str]:
s = s.strip().lower()
if _EMAIL_RE.fullmatch(s):
return s
return None
def _recover_split_emails(text: str) -> List[str]:
out: List[str] = []
for m in _EMAIL_SPLIT_RE.finditer(text or ""):
prefix = (m.group("prefix") or "").strip().lower().strip(".-_")
if not prefix or prefix in _EMAIL_PREFIX_STOP:
continue
if not re.search(r"[._\-\d]", prefix):
continue
tail = (m.group("tail") or "").strip().lower()
if "@" not in tail:
continue
local_tail, domain = tail.split("@", 1)
local = f"{prefix}{local_tail}"
if len(local) > 64:
continue
cand = f"{local}@{domain}"
if _EMAIL_RE.fullmatch(cand):
out.append(cand)
return out
def _prune_fragment_emails(values: List[str]) -> List[str]:
uniq = sorted(set(v.lower().strip() for v in values if v and "@" in v))
out: List[str] = []
for e in uniq:
local, domain = e.split("@", 1)
drop = False
for other in uniq:
if other == e:
continue
ol, od = other.split("@", 1)
if od != domain:
continue
if len(local) <= 8 and len(ol) > len(local) + 2 and ol.endswith(local) and re.search(r"[._\-]", ol):
drop = True
break
if not drop:
out.append(e)
return out
def _looks_like_month_range(digits: str) -> bool:
if len(digits) == 12:
try:
mm1 = int(digits[0:2]); yyyy1 = int(digits[2:6])
mm2 = int(digits[6:8]); yyyy2 = int(digits[8:12])
if 1 <= mm1 <= 12 and 1900 <= yyyy1 <= 2100 and 1 <= mm2 <= 12 and 1900 <= yyyy2 <= 2100:
return True
except Exception:
return False
return False
def _norm_phone(s: str) -> Optional[str]:
raw = s.strip()
if not (raw.startswith("+") or raw.startswith("7") or raw.startswith("8")):
return None
digits = re.sub(r"\D+", "", raw)
if len(digits) < 10 or len(digits) > 15:
return None
if len(set(digits)) <= 2:
return None
if _looks_like_month_range(digits):
return None
if len(digits) == 12 and digits.startswith(("2", "3", "4", "5", "6", "7", "8", "9")):
if digits.count("0") >= 6:
return None
return "+" + digits
def _norm_tg_handle(handle: str) -> Optional[str]:
h = handle.strip().lstrip("@").lower()
if not (5 <= len(h) <= 32):
return None
if not re.fullmatch(r"[a-z0-9_]+", h):
return None
if h.isdigit():
return None
if h in _TG_STOP:
return None
return h
def normalize_contacts(raw: Any, clean_text: str) -> Dict[str, List[str]]:
out: Dict[str, List[str]] = {"email": [], "phone": [], "tg": [], "github": [], "linkedin": []}
if isinstance(raw, dict):
key_map = {
"emails": "email", "email": "email",
"phones": "phone", "phone": "phone",
"telegram": "tg", "tg": "tg",
"github": "github",
"linkedin": "linkedin",
}
for k, v in raw.items():
nk = key_map.get(k)
if not nk:
continue
vals = [coerce_text(x) for x in v] if isinstance(v, list) else [coerce_text(v)]
out[nk].extend(vals)
for e in _EMAIL_RE.findall(clean_text):
out["email"].append(e)
for e in _recover_split_emails(clean_text):
out["email"].append(e)
for chunk in _PHONE_CHUNK_RE.findall(clean_text):
out["phone"].append(chunk)
for h in _TG_AT_RE.findall(clean_text):
out["tg"].append(h)
for h in _TG_LINK_RE.findall(clean_text):
out["tg"].append(h)
def uniq(seq: List[str]) -> List[str]:
seen = set()
res = []
for x in seq:
if x in seen:
continue
seen.add(x)
res.append(x)
return res
emails: List[str] = []
for s in out["email"]:
n = _norm_email(s)
if n:
emails.append(n)
phones: List[str] = []
for s in out["phone"]:
n = _norm_phone(s)
if n:
phones.append(n)
tgs: List[str] = []
for s in out["tg"]:
n = _norm_tg_handle(s)
if n:
tgs.append(n)
out["email"] = uniq(_prune_fragment_emails(emails))
out["phone"] = uniq(phones)
out["tg"] = uniq(tgs)
out["github"] = uniq([coerce_text(x).strip() for x in out["github"] if coerce_text(x).strip()])
out["linkedin"] = uniq([coerce_text(x).strip() for x in out["linkedin"] if coerce_text(x).strip()])
return out
# -----------------------------
# LLM helpers
# -----------------------------
_LANGUAGE_CANON = {
"python",
"java",
"kotlin",
"go",
"golang",
"c++",
"cpp",
"c#",
"javascript",
"typescript",
"ruby",
"php",
"swift",
"objective-c",
"scala",
"rust",
"dart",
}
_LANGUAGE_ALIAS = {
"golang": "go",
"cpp": "c++",
"c plus plus": "c++",
"csharp": "c#",
"c#": "c#",
"js": "javascript",
"ts": "typescript",
}
_JAVA_REAL_RE = re.compile(r"\b(java\s*(8|11|17|21)|spring|jvm|maven|gradle|jakarta)\b", re.I)
_JAVASCRIPT_RE = re.compile(r"\b(java\s*script|javascript|js)\b", re.I)
def _norm_lang_token(token: str) -> Optional[str]:
raw = (token or "").strip().lower()
if not raw:
return None
norm = _LANGUAGE_ALIAS.get(raw, raw)
if norm in _LANGUAGE_CANON:
# collapse golang -> go, cpp -> c++
if norm == "golang":
norm = "go"
if norm == "cpp":
norm = "c++"
return norm
return None
def _normalize_language_list(values: List[str]) -> List[str]:
seen = set()
out: List[str] = []
for v in values or []:
tok = _norm_lang_token(v)
if not tok or tok in seen:
continue
seen.add(tok)
out.append(tok)
return out
def _drop_false_java(
skills: List[str],
primary_languages: List[str],
clean_text: str,
) -> Tuple[List[str], List[str]]:
norm_skills = [str(s).strip().lower() for s in (skills or [])]
if "java" not in norm_skills:
return skills, primary_languages
txt = clean_text or ""
has_js = _JAVASCRIPT_RE.search(txt) is not None
has_real_java = _JAVA_REAL_RE.search(txt) is not None
if has_js and not has_real_java:
cleaned_skills = [s for s in skills if str(s).strip().lower() != "java"]
cleaned_langs = [s for s in primary_languages if str(s).strip().lower() != "java"]
return cleaned_skills, cleaned_langs
return skills, primary_languages
def _roles_from_desired_title(title: Optional[str]) -> List[str]:
if not title:
return []
t = title.lower()
out: List[str] = []
if "backend" in t or "бэкенд" in t or "бекенд" in t:
out.append("backend")
if "frontend" in t or "фронтенд" in t:
out.append("frontend")
if "fullstack" in t or "full stack" in t or "фулстек" in t:
out.append("fullstack")
if "devops" in t or "sre" in t:
out.append("devops")
if "qa" in t or "test" in t or "тестировщик" in t:
out.append("qa")
if "data" in t or "ml" in t or "machine learning" in t or "аналитик" in t:
out.append("data")
if "android" in t or "ios" in t or "mobile" in t or "мобиль" in t:
out.append("mobile")
return out
def _merge_lists(base: List[str], extra: List[str], limit: Optional[int] = None) -> List[str]:
seen = set()
out: List[str] = []
for seq in (base or [], extra or []):
for x in seq:
t = str(x).strip()
if not t or t.lower() in seen:
continue
seen.add(t.lower())
out.append(t)
if limit is not None and len(out) >= limit:
return out
return out
def _pick_salary(
heur_min: Optional[int],
heur_max: Optional[int],
heur_conf: Optional[float],
llm_min: Optional[int],
llm_max: Optional[int],
) -> Tuple[Optional[int], Optional[int], Optional[float]]:
if heur_min or heur_max:
if heur_conf is None:
heur_conf = 0.55
return heur_min, heur_max, heur_conf
if llm_min or llm_max:
return llm_min, llm_max, 0.65
return heur_min, heur_max, heur_conf
_EN_SIGNAL_RE = re.compile(r"\b(english|англий|ielts|toefl|cefr|a1|a2|b1|b2|c1|c2)\b", re.I)
def _has_english_signal(text: str) -> bool:
if not text:
return False
return _EN_SIGNAL_RE.search(text) is not None
def _can_accept_llm_english(clean_text: str, level: Optional[str]) -> bool:
if not level:
return False
# Require explicit language signal in CV to avoid invented C1/C2.
return _has_english_signal(clean_text)
_ROLE_EVIDENCE_PATTERNS: Dict[str, re.Pattern] = {
"qa": re.compile(r"\b(qa|quality assurance|tester|test engineer|test automation)\b", re.I),
"devops": re.compile(r"\b(devops|dev ops|sre|platform engineer|infrastructure engineer)\b", re.I),
"mobile": re.compile(r"\b(mobile|android|ios|react native|flutter)\b", re.I),
"data": re.compile(r"\b(data engineer|data scientist|ml engineer|machine learning)\b", re.I),
"architect": re.compile(r"\b(architect|solution architect|software architect)\b", re.I),
}
def _prune_roles_by_evidence(roles: List[str], clean_text: str) -> List[str]:
out: List[str] = []
seen = set()
t = (clean_text or "").lower()
for role in roles or []:
r = str(role).strip().lower()
if not r or r in seen:
continue
seen.add(r)
pat = _ROLE_EVIDENCE_PATTERNS.get(r)
if pat is not None and not pat.search(t):
continue
out.append(r)
return out
def _parse_ym(date_iso: Optional[str]) -> Optional[Tuple[int, int]]:
if not date_iso:
return None
m = re.match(r"^\s*(\d{4})-(\d{2})", str(date_iso).strip())
if not m:
return None
y = int(m.group(1))
mm = int(m.group(2))
if not (1900 <= y <= 2100 and 1 <= mm <= 12):
return None
return (y, mm)
def _months_between(a: Tuple[int, int], b: Tuple[int, int]) -> int:
return (b[0] - a[0]) * 12 + (b[1] - a[1])
def _experience_years_from_positions(position_dicts: List[Dict[str, Any]]) -> Optional[float]:
intervals: List[Tuple[Tuple[int, int], Tuple[int, int]]] = []
for p in position_dicts or []:
if not isinstance(p, dict):
continue
a = _parse_ym(p.get("date_from"))
b = _parse_ym(p.get("date_to"))
if not a or not b:
continue
if b < a:
a, b = b, a
intervals.append((a, b))
if not intervals:
return None
intervals.sort(key=lambda x: x[0])
merged: List[Tuple[Tuple[int, int], Tuple[int, int]]] = [intervals[0]]
for s, e in intervals[1:]:
ls, le = merged[-1]
if s <= le:
if e > le:
merged[-1] = (ls, e)
else:
merged.append((s, e))
months = 0
for s, e in merged:
months += max(0, _months_between(s, e))
years = round(months / 12.0, 2)
if 0.0 <= years <= 60.0:
return years
return None
def _reconcile_experience_fields(
*,
exp_years: Optional[float],
exp_years_eng: Optional[float],
exp_conf: Optional[float],
exp_dbg: Dict[str, Any],
positions: List[Dict[str, Any]],
) -> Tuple[Optional[float], Optional[float], Optional[float], Dict[str, Any]]:
dbg = dict(exp_dbg or {})
source_notes: List[str] = []
pos_years = _experience_years_from_positions(positions)
if pos_years is not None:
dbg["positions_years"] = pos_years
if exp_years is None and pos_years is not None:
exp_years = pos_years
exp_conf = max(float(exp_conf or 0.0), 0.74)
source_notes.append("positions_fallback")
elif exp_years is not None and pos_years is not None and pos_years > (float(exp_years) + 1.0):
method = str(dbg.get("method") or "")
strong_summary = method in ("summary", "header_chunk") and float(exp_conf or 0.0) >= 0.78
if strong_summary and (pos_years - float(exp_years)) > 1.5:
source_notes.append("positions_reconcile_skip_strong_summary")
else:
exp_years = pos_years
exp_conf = max(float(exp_conf or 0.0), 0.75)
source_notes.append("positions_reconcile_up")
# Prevent impossible split like total=1.5 while engineering=7.0.
try:
if exp_years is not None and exp_years_eng is not None:
if float(exp_years) < float(exp_years_eng) * 0.7:
exp_years = float(exp_years_eng)
exp_conf = max(float(exp_conf or 0.0), 0.74)
source_notes.append("eng_gt_total_fix")
except Exception:
pass
is_recruiter = bool(dbg.get("is_recruiter"))
if exp_years_eng is None and exp_years is not None and not is_recruiter:
exp_years_eng = float(exp_years)
source_notes.append("eng_fill_from_total")
if source_notes:
dbg["reconcile"] = source_notes
return exp_years, exp_years_eng, exp_conf, dbg
def _prefer_explicit_summary_experience(
*,
clean_text: str,
exp_years: Optional[float],
exp_years_eng: Optional[float],
exp_conf: Optional[float],
exp_dbg: Dict[str, Any],
) -> Tuple[Optional[float], Optional[float], Optional[float], Dict[str, Any]]:
try:
clean_total, clean_eng, clean_conf, clean_dbg = extract_experience_years(clean_text or "")
except Exception:
return exp_years, exp_years_eng, exp_conf, exp_dbg
if clean_total is None:
return exp_years, exp_years_eng, exp_conf, exp_dbg
if exp_years is None:
merged_dbg = dict(exp_dbg or {})
merged_dbg["clean_exp_method"] = (clean_dbg or {}).get("method")
return clean_total, (clean_eng if clean_eng is not None else exp_years_eng), max(float(exp_conf or 0.0), float(clean_conf or 0.0)), merged_dbg
parsed_method = str((exp_dbg or {}).get("method") or "")
clean_method = str((clean_dbg or {}).get("method") or "")
if clean_conf is not None and clean_conf >= 0.78 and clean_method in ("summary", "header_chunk"):
try:
if parsed_method.startswith("timeline") and float(clean_total) + 1.5 < float(exp_years):
merged_dbg = dict(exp_dbg or {})
merged_dbg["clean_exp_method"] = clean_method
merged_dbg["reconcile_clean"] = "prefer_explicit_summary"
return clean_total, (clean_eng if clean_eng is not None else exp_years_eng), max(float(exp_conf or 0.0), float(clean_conf or 0.0)), merged_dbg
except Exception:
pass
return exp_years, exp_years_eng, exp_conf, exp_dbg
def _need_llm_fallback(
*,
roles: List[str],
skills: List[str],
exp_conf: Optional[float],
english: Optional[str],
location: Optional[str],
name: Optional[str],
doc_type: Optional[str],
) -> bool:
if doc_type == "scan_pdf":
return False
if not name:
return True
if not roles and len(skills) < 2:
return True
if exp_conf is None or exp_conf < 0.4:
return True
if not english and not location and len(skills) < 2:
return True
return False
def _maybe_llm_enrich(
*,
con: sqlite3.Connection,
clean: str,
roles: List[str],
skills: List[str],
exp_conf: Optional[float],
english: Optional[str],
location: Optional[str],
name: Optional[str],
doc_type: Optional[str],
sections: Optional[Dict[str, str]],
) -> Tuple[Optional[LLMExtraction], Dict[str, Any]]:
"""
LLM runs only as fallback when heuristics are weak,
unless forced via LLM_PARSE_FORCE=1.
"""
if not llm_parse_enabled():
return None, {"enabled": False}
forced = os.environ.get("LLM_PARSE_FORCE", "0").lower() in ("1", "true", "yes")
if not forced and not _need_llm_fallback(
roles=roles,
skills=skills,
exp_conf=exp_conf,
english=english,
location=location,
name=name,
doc_type=doc_type,
):
return None, {"enabled": True, "forced": False, "used": False, "reason": "heuristics_ok"}
llm_res, llm_dbg = llm_extract_profile(
clean,
con=con,
doc_type=doc_type,
sections=sections,
)
if isinstance(llm_dbg, dict):
llm_dbg["forced"] = forced
llm_dbg["used"] = bool(llm_res)
return llm_res, llm_dbg
_EN_ORDER = {"A1": 1, "A2": 2, "B1": 3, "B2": 4, "C1": 5, "C2": 6}
def _llm_review_mode() -> str:
mode = (os.environ.get("LLM_PARSE_REVIEW_MODE", "always") or "").strip().lower()
if mode in ("0", "false", "no", "off"):
return "off"
if mode in ("auto", "smart", "on_demand"):
return "auto"
return "always"
def _llm_review_rounds() -> int:
raw = (os.environ.get("LLM_PARSE_REVIEW_ROUNDS", "1") or "").strip()
try:
rounds = int(raw)
except Exception:
rounds = 1
return max(1, min(rounds, 3))
def _normalize_cefr(level: Optional[str]) -> Optional[str]:
if not level:
return None
m = re.search(r"\b(A1|A2|B1|B2|C1|C2)\b", str(level).upper())
return m.group(1) if m else None
def _bounded_float(v: Any, lo: float, hi: float) -> Optional[float]:
try:
x = float(v)
except Exception:
return None
if x < lo or x > hi:
return None
return float(round(x, 2))
def _bounded_int(v: Any, lo: int, hi: int) -> Optional[int]:
try:
x = int(float(v))
except Exception:
return None
if x < lo or x > hi:
return None
return x
def _llm_review_needed(
*,
mode: str,
llm_enriched_used: bool,
name: Optional[str],
roles: List[str],
skills: List[str],
exp_conf: Optional[float],
english: Optional[str],
location: Optional[str],
) -> bool:
if mode == "off":
return False
if mode == "always":
return True
if llm_enriched_used:
return True
if not name:
return True
if not roles or len(skills) < 3:
return True
if exp_conf is None or exp_conf < 0.65:
return True
if not english or not location:
return True
return False
def _build_llm_review_draft(
*,
roles: List[str],
skills: List[str],
primary_languages: List[str],
seniority: Optional[str],
backend_focus: Optional[bool],
exp_years: Optional[float],
exp_years_eng: Optional[float],
english: Optional[str],
location: Optional[str],
remote: Optional[bool],
sal_min: Optional[int],
sal_max: Optional[int],
highlights: List[str],
keywords: List[str],
) -> Dict[str, Any]:
return {
"roles": roles[:12],
"skills": skills[:64],
"primary_languages": primary_languages[:12],
"seniority": seniority,
"backend_focus": backend_focus,
"experience_years_total": exp_years,
"experience_years_engineering": exp_years_eng,
"english_level": _normalize_cefr(english),
"location": location,
"remote_ok": remote,
"salary_min_rub": sal_min,
"salary_max_rub": sal_max,
"highlights": highlights[:8],
"keywords": keywords[:40],
}
def _merge_review_result(
*,
review: LLMExtraction,
review_dbg: Dict[str, Any],
roles: List[str],
skills: List[str],
primary_languages: List[str],
seniority: Optional[str],
backend_focus: Optional[bool],
remote: Optional[bool],
location: Optional[str],
english: Optional[str],
exp_years: Optional[float],
exp_years_eng: Optional[float],
exp_conf: Optional[float],
sal_min: Optional[int],
sal_max: Optional[int],
sal_conf: Optional[float],
highlights: List[str],
keywords: List[str],
llm_summary: Optional[str],
llm_tags: List[str],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
quality = review_dbg.get("quality_score")
try:
quality_f = float(quality) if quality is not None else None
except Exception:
quality_f = None
trusted = quality_f is None or quality_f >= 0.55
changed: List[str] = []
model_changed_raw = review_dbg.get("model_changed_fields") or []
model_changed = set()
if isinstance(model_changed_raw, list):
for x in model_changed_raw:
s = str(x).strip()
if s:
model_changed.add(s)
roles_out = list(roles or [])
if review.roles:
if trusted and "roles" in model_changed:
merged_roles = _merge_lists(review.roles, [], limit=12)
else:
merged_roles = _merge_lists(review.roles, roles_out, limit=12) if trusted else _merge_lists(roles_out, review.roles, limit=12)
if merged_roles != roles_out:
changed.append("roles")
roles_out = merged_roles
skills_out = list(skills or [])
if review.skills:
merged_skills = _merge_lists(review.skills, skills_out, limit=64) if trusted else _merge_lists(skills_out, review.skills, limit=64)
if merged_skills != skills_out:
changed.append("skills")
skills_out = merged_skills
langs_out = list(primary_languages or [])
review_langs = _normalize_language_list(review.primary_languages)
if review_langs:
if trusted and "primary_languages" in model_changed:
merged_langs = _merge_lists(review_langs, [], limit=12)
else:
merged_langs = _merge_lists(review_langs, langs_out, limit=12) if trusted else _merge_lists(langs_out, review_langs, limit=12)
if merged_langs != langs_out:
changed.append("primary_languages")
langs_out = merged_langs
seniority_out = seniority
if review.seniority and (trusted or not seniority_out):
if review.seniority != seniority_out:
changed.append("seniority")
seniority_out = review.seniority
backend_focus_out = backend_focus
if review.backend_focus is not None and (trusted or backend_focus_out is None):
if review.backend_focus != backend_focus_out:
changed.append("backend_focus")
backend_focus_out = review.backend_focus
remote_out = remote
if review.remote_ok is not None and (trusted or remote_out is None):
if review.remote_ok != remote_out:
changed.append("remote")
remote_out = review.remote_ok
location_out = location
if review.location and (trusted or not location_out):
loc = review.location.strip()
if 2 <= len(loc) <= 120 and loc != (location_out or ""):
changed.append("location")
location_out = loc
english_out = _normalize_cefr(english)
review_english = _normalize_cefr(review.english_level)
if review_english:
if english_out is None:
english_out = review_english
changed.append("english")
elif trusted and _EN_ORDER.get(review_english, 0) > _EN_ORDER.get(english_out, 0):
english_out = review_english
changed.append("english")
exp_years_out = exp_years
exp_years_eng_out = exp_years_eng
exp_conf_out = exp_conf
review_exp_total = _bounded_float(review.experience_years_total, 0.0, 60.0)
review_exp_eng = _bounded_float(review.experience_years_engineering, 0.0, 60.0)
if review_exp_total is not None:
if exp_years_out is None or (trusted and ((exp_conf_out or 0.0) < 0.75)):
if exp_years_out != review_exp_total:
changed.append("experience_years_total")
exp_years_out = review_exp_total
exp_conf_out = max(float(exp_conf_out or 0.0), 0.78 if trusted else 0.65)
if review_exp_eng is not None:
if exp_years_eng_out is None or trusted:
if exp_years_eng_out != review_exp_eng:
changed.append("experience_years_engineering")
exp_years_eng_out = review_exp_eng
exp_conf_out = max(float(exp_conf_out or 0.0), 0.74 if trusted else 0.62)
sal_min_out = sal_min
sal_max_out = sal_max
sal_conf_out = sal_conf
cand_min = _bounded_int(review.salary_min_rub, 10_000, 200_000_000)
cand_max = _bounded_int(review.salary_max_rub, 10_000, 200_000_000)
if cand_min is None and cand_max is None:
cand_min = _bounded_int(review.salary_min_usd, 100, 2_000_000)
cand_max = _bounded_int(review.salary_max_usd, 100, 2_000_000)
if cand_min is not None or cand_max is not None:
if cand_min is not None and cand_max is not None and cand_min > cand_max:
cand_min, cand_max = cand_max, cand_min
if (sal_min_out is None and sal_max_out is None) or (trusted and (sal_conf_out is None or sal_conf_out < 0.75)):
if cand_min is not None and cand_min != sal_min_out:
sal_min_out = cand_min
changed.append("salary")
if cand_max is not None and cand_max != sal_max_out:
sal_max_out = cand_max
changed.append("salary")
sal_conf_out = max(float(sal_conf_out or 0.0), 0.72 if trusted else 0.60)
highlights_out = list(highlights or [])
if review.highlights:
merged_highlights = _merge_lists(review.highlights, highlights_out, limit=8) if trusted else _merge_lists(highlights_out, review.highlights, limit=8)
if merged_highlights != highlights_out:
highlights_out = merged_highlights
changed.append("highlights")
keywords_out = list(keywords or [])
if review.keywords:
merged_keywords = _merge_lists(review.keywords, keywords_out, limit=40) if trusted else _merge_lists(keywords_out, review.keywords, limit=40)
if merged_keywords != keywords_out:
keywords_out = merged_keywords
changed.append("keywords")
llm_tags_out = list(llm_tags or [])
llm_tags_out = _merge_lists(keywords_out, llm_tags_out, limit=40)
llm_tags_out = _merge_lists(skills_out, llm_tags_out, limit=40)
llm_tags_out = _merge_lists(langs_out, llm_tags_out, limit=40)
llm_summary_out = llm_summary
if highlights_out:
merged_summary = "; ".join([h.strip() for h in highlights_out if h.strip()])[:800]
if merged_summary and merged_summary != (llm_summary_out or ""):
llm_summary_out = merged_summary
changed.append("llm_summary")
changed_uniq = []
changed_seen = set()
for item in changed:
if item in changed_seen:
continue
changed_seen.add(item)
changed_uniq.append(item)
return (
{
"roles": roles_out,
"skills": skills_out,
"primary_languages": langs_out,
"seniority": seniority_out,
"backend_focus": backend_focus_out,
"remote": remote_out,
"location": location_out,
"english": english_out,
"exp_years": exp_years_out,
"exp_years_eng": exp_years_eng_out,
"exp_conf": exp_conf_out,
"sal_min": sal_min_out,
"sal_max": sal_max_out,
"sal_conf": sal_conf_out,
"highlights": highlights_out,
"keywords": keywords_out,
"llm_summary": llm_summary_out,
"llm_tags": llm_tags_out,
},
{
"trusted": trusted,
"quality_score": quality_f,
"changed_fields": changed_uniq,
"issues_found": review_dbg.get("issues_found") or [],
"model_changed_fields": review_dbg.get("changed_fields") or [],
},
)
# -----------------------------
# candidate/resume DB helpers
# -----------------------------
def stable_candidate_id(contacts: Dict[str, List[str]], name: Optional[str], simh: int) -> str:
if contacts.get("email"):
return "cand_" + sha1_str("email:" + contacts["email"][0])
if contacts.get("phone"):
return "cand_" + sha1_str("phone:" + contacts["phone"][0])
if contacts.get("tg"):
return "cand_" + sha1_str("tg:" + contacts["tg"][0])
if contacts.get("github"):
return "cand_" + sha1_str("gh:" + contacts["github"][0])
if contacts.get("linkedin"):
return "cand_" + sha1_str("li:" + contacts["linkedin"][0])
base = (name or "unknown").strip().lower()
return "cand_" + sha1_str(f"name:{base}:{simh}")
def _candidate_by_contact(con: sqlite3.Connection, contacts: Dict[str, List[str]]) -> Optional[str]:
checks = [
("email", contacts.get("email", [])),
("phone", contacts.get("phone", [])),
("tg", contacts.get("tg", [])),
("github", contacts.get("github", [])),
("linkedin", contacts.get("linkedin", [])),
]
for ctype, vals in checks:
for v in vals:
row = con.execute(
"SELECT candidate_id FROM candidate_contacts WHERE contact_type=? AND contact_value=?",
(ctype, v),
).fetchone()
if row:
return row["candidate_id"]
return None
def _upsert_contacts(con: sqlite3.Connection, candidate_id: str, contacts: Dict[str, List[str]]) -> None:
pairs: List[Tuple[str, str]] = []
for e in contacts.get("email", []):
pairs.append(("email", e))
for p in contacts.get("phone", []):
pairs.append(("phone", p))
for t in contacts.get("tg", []):
pairs.append(("tg", t))
for g in contacts.get("github", []):
pairs.append(("github", g))
for l in contacts.get("linkedin", []):
pairs.append(("linkedin", l))
for ctype, val in pairs:
con.execute(
"INSERT OR IGNORE INTO candidate_contacts(contact_type, contact_value, candidate_id) VALUES (?,?,?)",
(ctype, val, candidate_id),
)
def _upsert_candidate_skills(
con: sqlite3.Connection,
candidate_id: str,
skills_primary: List[str],
skills_secondary: List[str],
source: str,
) -> None:
for sk in skills_primary:
con.execute(
"""INSERT OR REPLACE INTO candidate_skills(candidate_id, skill_id, skill_label, confidence, source, evidence)
VALUES (?,?,?,?,?,?)""",
(candidate_id, sk, sk, 0.90, source, "skills_primary"),
)
for sk in skills_secondary:
con.execute(
"""INSERT OR REPLACE INTO candidate_skills(candidate_id, skill_id, skill_label, confidence, source, evidence)
VALUES (?,?,?,?,?,?)""",
(candidate_id, sk, sk, 0.60, source, "skills_secondary"),
)
def _upsert_candidate_roles(
con: sqlite3.Connection,
candidate_id: str,
roles: List[str],
source: str,
) -> None:
for r in roles:
con.execute(
"""INSERT OR REPLACE INTO candidate_roles(candidate_id, role, confidence, source, evidence)
VALUES (?,?,?,?,?)""",
(candidate_id, r, 0.80, source, "roles"),
)
def _upsert_candidate_languages(
con: sqlite3.Connection,
candidate_id: str,
english_level: Optional[str],
source: str,
) -> None:
if not english_level:
return
con.execute(
"""INSERT OR REPLACE INTO candidate_languages(candidate_id, language, level, confidence, source, evidence)
VALUES (?,?,?,?,?,?)""",
(candidate_id, "english", english_level, 0.75, source, "english_level"),
)
def _ensure_candidate(con: sqlite3.Connection, candidate_id: str, fields: Dict[str, Any]) -> None:
# Attempt to ensure the new column exists if migration didn't run
try:
con.execute("ALTER TABLE candidates ADD COLUMN experience_years_eng REAL")
except Exception:
pass # Column likely exists or basic sqlite error, proceed to insert
try:
con.execute("ALTER TABLE candidates ADD COLUMN primary_languages_json TEXT")
except Exception:
pass
try:
con.execute("ALTER TABLE candidates ADD COLUMN backend_focus INTEGER")
except Exception:
pass
exists = con.execute("SELECT 1 FROM candidates WHERE candidate_id=?", (candidate_id,)).fetchone() is not None
primary_languages_json = safe_json(fields.get("primary_languages", []))
backend_focus_field = fields.get("backend_focus")
backend_focus_int = None if backend_focus_field is None else (1 if backend_focus_field else 0)
if not exists:
con.execute(
"""INSERT INTO candidates(
candidate_id, name, location, remote,
experience_years, experience_years_eng, experience_confidence,
salary_min, salary_max, salary_confidence,
english_level, roles_json, skills_json, primary_languages_json,
roles_norm, skills_norm, backend_focus,
created_at, updated_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
candidate_id,
fields.get("name"),
fields.get("location"),
fields.get("remote"),
fields.get("experience_years"),
fields.get("experience_years_eng"), # new field
fields.get("experience_confidence"),
fields.get("salary_min"),
fields.get("salary_max"),
fields.get("salary_confidence"),
fields.get("english_level"),
safe_json(fields.get("roles", [])),
safe_json(fields.get("skills", [])),
primary_languages_json,
fields.get("roles_norm") or "|",
fields.get("skills_norm") or "|",
backend_focus_int,
utc_iso(),
utc_iso(),
),
)
else:
con.execute(
"""UPDATE candidates SET
name = COALESCE(?, name),
location = COALESCE(?, location),
remote = COALESCE(?, remote),
experience_years = COALESCE(?, experience_years),
experience_years_eng = COALESCE(?, experience_years_eng),
experience_confidence = COALESCE(?, experience_confidence),
salary_min = COALESCE(?, salary_min),
salary_max = COALESCE(?, salary_max),
salary_confidence = COALESCE(?, salary_confidence),
english_level = COALESCE(?, english_level),
roles_json = CASE WHEN ? IS NOT NULL AND ? != '[]' THEN ? ELSE roles_json END,
skills_json = CASE WHEN ? IS NOT NULL AND ? != '[]' THEN ? ELSE skills_json END,
primary_languages_json = CASE WHEN ? IS NOT NULL AND ? != '[]' THEN ? ELSE primary_languages_json END,
roles_norm = CASE WHEN ? != '|' THEN ? ELSE roles_norm END,
skills_norm = CASE WHEN ? != '|' THEN ? ELSE skills_norm END,
backend_focus = COALESCE(?, backend_focus),
updated_at = ?
WHERE candidate_id = ?""",
(
fields.get("name"),
fields.get("location"),
fields.get("remote"),
fields.get("experience_years"),
fields.get("experience_years_eng"), # new field update
fields.get("experience_confidence"),
fields.get("salary_min"),
fields.get("salary_max"),
fields.get("salary_confidence"),
fields.get("english_level"),
safe_json(fields.get("roles", [])),
safe_json(fields.get("roles", [])),
safe_json(fields.get("roles", [])),
safe_json(fields.get("skills", [])),
safe_json(fields.get("skills", [])),
safe_json(fields.get("skills", [])),
primary_languages_json,
primary_languages_json,
primary_languages_json,
fields.get("roles_norm") or "|",
fields.get("roles_norm") or "|",
fields.get("skills_norm") or "|",
fields.get("skills_norm") or "|",
backend_focus_int,
utc_iso(),
candidate_id,
),
)
def _resume_by_sha(con: sqlite3.Connection, sha: str) -> Optional[str]:
row = con.execute("SELECT resume_id FROM resumes WHERE sha256=?", (sha,)).fetchone()
return row["resume_id"] if row else None
def _near_duplicate_active_resume(con: sqlite3.Connection, simh: int, max_dist: int) -> Optional[Tuple[str, int]]:
candidate_resume_ids = set()
for bucket, band in simhash_bands(simh):
cur = con.execute("SELECT resume_id FROM simhash_buckets WHERE bucket=? AND band=?", (bucket, band))
for r in cur.fetchall():
candidate_resume_ids.add(r["resume_id"])
best: Optional[Tuple[str, int]] = None
for rid in candidate_resume_ids:
row = con.execute("SELECT simhash FROM resumes WHERE resume_id=? AND is_active=1", (rid,)).fetchone()
if not row or row["simhash"] is None:
continue
try:
old = int(str(row["simhash"]), 16)
except Exception:
continue
dist = hamming64(old, simh)
if dist <= max_dist:
if best is None or dist < best[1]:
best = (rid, dist)
return best
def _insert_resume(
con: sqlite3.Connection,
candidate_id: str,
sha: Optional[str],
simh: int,
clean_text: str,
raw_text: str,
extraction_json: str,
llm_summary: Optional[str],
llm_tags: List[str],
extract_method: Optional[str],
extract_quality_score: Optional[float],
extract_quality_flags: Optional[str],
extract_pages_json: Optional[str],
doc_type: Optional[str],
doc_type_confidence: Optional[float],
parse_method: Optional[str],
parse_version: Optional[str],
sections_json: Optional[str],
file_path: Optional[str],
mtime: Optional[int],
size: Optional[int],
near_dup_of: Optional[str],
) -> str:
resume_id = "res_" + uuid.uuid4().hex
if near_dup_of:
con.execute("UPDATE resumes SET is_active=0 WHERE resume_id=?", (near_dup_of,))
con.execute(
"""INSERT INTO resumes(
resume_id, candidate_id, sha256, simhash, clean_text, raw_text, extraction_json,
llm_summary, llm_tags_json,
extract_method, extract_quality_score, extract_quality_flags, extract_pages_json,
doc_type, doc_type_confidence, parse_method, parse_version, sections_json,
is_active, duplicate_of_resume_id, file_path, file_mtime, file_size, created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
resume_id,
candidate_id,
sha,
f"{simh:016x}",
clean_text,
raw_text[:250000],
extraction_json,
llm_summary,
safe_json(llm_tags),
extract_method,
extract_quality_score,
extract_quality_flags,
extract_pages_json,
doc_type,
doc_type_confidence,
parse_method,
parse_version,
sections_json,
1,
near_dup_of,
file_path,
mtime,
size,
utc_iso(),
),
)
for bucket, band in simhash_bands(simh):
con.execute(
"INSERT OR IGNORE INTO simhash_buckets(bucket, band, resume_id) VALUES (?,?,?)",
(bucket, band, resume_id),
)
return resume_id
def _insert_source(con: sqlite3.Connection, resume_id: str, src: Dict[str, Any]) -> None:
con.execute(
"""INSERT INTO sources(
resume_id, export_path, chat_title, message_id, message_date,
origin_type, original_file_path, original_file_name, extra_json
) VALUES (?,?,?,?,?,?,?,?,?)""",
(
resume_id,
src.get("export_path"),
src.get("chat_title"),
src.get("message_id"),
src.get("message_date"),
src.get("origin_type"),
src.get("file_path"),
src.get("original_name"),
json.dumps(src.get("extra", {}), ensure_ascii=False),
),
)
def _insert_positions(
con: sqlite3.Connection,
resume_id: str,
candidate_id: str,
positions: List[Dict[str, Any]],
) -> None:
if not positions:
return
for p in positions:
pos_id = "pos_" + uuid.uuid4().hex
con.execute(
"""INSERT INTO positions(
position_id, resume_id, candidate_id, title, company,
date_from, date_to, is_current, description, stack_json
) VALUES (?,?,?,?,?,?,?,?,?,?)""",
(
pos_id,
resume_id,
candidate_id,
p.get("title"),
p.get("company"),
p.get("date_from"),
p.get("date_to"),
1 if p.get("is_current") else 0 if p.get("is_current") is not None else None,
p.get("description"),
json.dumps(p.get("stack") or [], ensure_ascii=False),
),
)
def _update_files_seen(con: sqlite3.Connection, sha: str, size: int, mtime: int, canonical_resume_id: str) -> None:
con.execute(
"""INSERT INTO files_seen(sha256, size, mtime, canonical_resume_id, first_seen_at, last_seen_at)
VALUES (?,?,?,?,?,?)
ON CONFLICT(sha256) DO UPDATE SET
size=excluded.size,
mtime=excluded.mtime,
canonical_resume_id=excluded.canonical_resume_id,
last_seen_at=excluded.last_seen_at
""",
(sha, size, mtime, canonical_resume_id, utc_iso(), utc_iso()),
)
# -----------------------------
# artifacts collection
# -----------------------------
def collect_artifacts(input_root: Path) -> List[Dict[str, Any]]:
artifacts: List[Dict[str, Any]] = []
for rj in find_result_json(input_root):
artifacts.extend(list(iter_json_artifacts(rj)))
for mh in find_messages_html(input_root):
artifacts.extend(list(iter_html_artifacts(mh)))
artifacts.extend(list(iter_file_scan(input_root)))
return artifacts
# -----------------------------
# main pipeline
# -----------------------------
def import_exports(
con: sqlite3.Connection,
input_dir: str,
log: Logger,
max_near_dist: int = 6,
min_text_len: int = 250,
commit_every: int = 20,
) -> Dict[str, Any]:
root = Path(input_dir).resolve()
if not root.exists():
raise SystemExit(f"Input not found: {root}")
artifacts = collect_artifacts(root)
log.info(f"[import] artifacts found: {len(artifacts)}", {"input": str(root)})
stats: Dict[str, Any] = {
"input": str(root),
"artifacts": len(artifacts),
"processed_new": 0,
"dup_sha": 0,
"near_dup": 0,
"short_or_empty": 0,
"errors": 0,
"sources_added_only": 0,
"llm_enriched": 0,
"llm_reviewed": 0,
"llm_review_changed": 0,
}
near_dup_examples: List[Dict[str, Any]] = []
for i, a in enumerate(artifacts, start=1):
try:
raw_text = ""
sha = None
file_path = a.get("file_path")
size = None
mtime = None
extract_method = None
extract_score = None
extract_flags: List[str] = []
pages: List[Dict[str, Any]] = []
if file_path:
fp = Path(file_path)
if not fp.exists():
continue
st = fp.stat()
size = int(st.st_size)
mtime = int(st.st_mtime)
sha = sha256_file(str(fp))
existing_resume = _resume_by_sha(con, sha)
if existing_resume:
stats["dup_sha"] += 1
_insert_source(con, existing_resume, a)
_update_files_seen(con, sha, size, mtime, existing_resume)
stats["sources_added_only"] += 1
continue
if fp.suffix.lower() == ".pdf":
pdf_res = extract_pdf_best(fp, timeout_sec=25)
raw_text = pdf_res.text
extract_method = pdf_res.method
extract_score = pdf_res.score
extract_flags = pdf_res.flags
pages = pdf_res.pages
else:
try:
raw_text = extract_text_generic(fp) or ""
except Exception as e:
if log:
log.warn("[extract] file failed - skipped", {"file": str(fp), "err": repr(e)})
raw_text = ""
extract_method = f"file_{fp.suffix.lower().lstrip('.') or 'unknown'}"
else:
raw_text = a.get("message_text") or ""
extract_method = "telegram_post"
raw_text = coerce_text(raw_text)
if not raw_text or len(raw_text.strip()) < min_text_len:
stats["short_or_empty"] += 1
continue
clean = normalize_text(raw_text)
if not clean or len(clean) < min_text_len:
stats["short_or_empty"] += 1
continue
file_ext = Path(file_path).suffix.lower() if file_path else None
dt = detect_doc_type(clean, file_ext=file_ext)
if a.get("origin_type") == "message_text":
from tg_resume_db.extract.doc_type import DocTypeResult
dt = DocTypeResult(doc_type="telegram_post", confidence=0.92, signals=["telegram_message"])
if extract_flags and "scan_like" in extract_flags:
from tg_resume_db.extract.doc_type import DocTypeResult
dt = DocTypeResult(doc_type="scan_pdf", confidence=0.9, signals=dt.signals + ["scan_like"])
sections = split_sections(clean, dt.doc_type)
sections_list = sections_present(sections)
exp_section_text = sections.get("experience") if isinstance(sections, dict) else None
positions = extract_positions(exp_section_text or clean)
position_dicts = positions_to_dicts(positions)
parser = tpl_generic
if dt.confidence >= 0.8:
if dt.doc_type == "hh_ru":
parser = tpl_hh
elif dt.doc_type == "linkedin_pdf":
parser = tpl_linkedin
elif dt.doc_type == "one_page_en":
parser = tpl_one_page_en
elif dt.doc_type == "one_page_ru":
parser = tpl_one_page_ru
elif dt.doc_type == "pptx_export":
parser = tpl_pptx
parsed = parser.parse_resume(clean, sections)
parse_method = parsed.get("parse_method") or "generic_heur"
contacts_raw = parsed.get("contacts_raw") or extract_contacts_raw(clean)
contacts = normalize_contacts(contacts_raw, clean)
name = parsed.get("name") or extract_name_guess(clean)
remote = parsed.get("remote")
if remote is None:
remote = extract_remote(clean)
english = parsed.get("english") or extract_english(clean)
roles = parsed.get("roles") or []
skills = parsed.get("skills") or []
primary_languages: List[str] = []
location = parsed.get("location") or extract_location_best_effort(clean)
exp_years = parsed.get("exp_years")
exp_years_eng = parsed.get("exp_years_eng")
exp_conf = parsed.get("exp_conf")
exp_dbg = parsed.get("exp_dbg") or {}
if exp_years is None and exp_years_eng is None:
exp_years, exp_years_eng, exp_conf, exp_dbg = extract_experience_years(clean)
exp_years, exp_years_eng, exp_conf, exp_dbg = _prefer_explicit_summary_experience(
clean_text=clean,
exp_years=exp_years,
exp_years_eng=exp_years_eng,
exp_conf=exp_conf,
exp_dbg=exp_dbg,
)
exp_years, exp_years_eng, exp_conf, exp_dbg = _reconcile_experience_fields(
exp_years=exp_years,
exp_years_eng=exp_years_eng,
exp_conf=exp_conf,
exp_dbg=exp_dbg,
positions=position_dicts,
)
sal_min = parsed.get("salary_min")
sal_max = parsed.get("salary_max")
sal_conf = parsed.get("salary_conf")
sal_dbg = parsed.get("salary_dbg") or {}
if sal_min is None and sal_max is None:
sal_min, sal_max, sal_conf, sal_dbg = extract_salary(clean)
llm_summary: Optional[str] = None
llm_tags: List[str] = []
seniority: Optional[str] = None
highlights: List[str] = []
keywords: List[str] = []
llm_enriched, llm_dbg = _maybe_llm_enrich(
con=con,
clean=clean,
roles=roles,
skills=skills,
exp_conf=exp_conf,
english=english,
location=location,
name=name,
doc_type=dt.doc_type,
sections=sections,
)
backend_focus_flag: Optional[bool] = None
if llm_enriched:
parse_method = "llm_rag"
stats["llm_enriched"] += 1
roles = _merge_lists(llm_enriched.roles, roles, limit=8)
normalized_llm_langs = _normalize_language_list(llm_enriched.primary_languages)
if normalized_llm_langs:
primary_languages = _merge_lists(normalized_llm_langs, primary_languages, limit=8)
skills = _merge_lists(normalized_llm_langs, skills, limit=48)
skills = _merge_lists(llm_enriched.skills, skills, limit=48)
if remote is None and llm_enriched.remote_ok is not None:
remote = llm_enriched.remote_ok
if not location and llm_enriched.location:
location = llm_enriched.location
if not english and llm_enriched.english_level and _can_accept_llm_english(clean, llm_enriched.english_level):
english = llm_enriched.english_level
backend_focus_flag = llm_enriched.backend_focus
if llm_enriched.backend_focus is True:
roles = _merge_lists(["backend"], roles, limit=8)
elif llm_enriched.backend_focus is False:
pruned_roles: List[str] = []
seen_roles = set()
for r in roles:
if r.lower() == "backend":
continue
rl = r.lower()
if rl in seen_roles:
continue
seen_roles.add(rl)
pruned_roles.append(r)
roles = pruned_roles
if (exp_conf is None or exp_conf < 0.6) and llm_enriched.experience_years_total is not None:
exp_years = llm_enriched.experience_years_total
exp_conf = 0.65
if llm_enriched.experience_years_engineering is not None:
exp_years_eng = llm_enriched.experience_years_engineering
sal_min, sal_max, sal_conf = _pick_salary(
sal_min, sal_max, sal_conf, llm_enriched.salary_min_rub, llm_enriched.salary_max_rub
)
if sal_min is None and sal_max is None:
sal_min, sal_max, sal_conf = _pick_salary(
sal_min, sal_max, sal_conf, llm_enriched.salary_min_usd, llm_enriched.salary_max_usd
)
seniority = llm_enriched.seniority
highlights = [h.strip() for h in llm_enriched.highlights if h.strip()]
if highlights:
llm_summary = "; ".join(highlights)[:800]
keywords = _merge_lists(llm_enriched.keywords, keywords, limit=40)
llm_tags = _merge_lists(llm_enriched.keywords, llm_tags, limit=24)
llm_tags = _merge_lists(llm_enriched.skills, llm_tags, limit=24)
llm_tags = _merge_lists(llm_enriched.primary_languages, llm_tags, limit=24)
desired_title = parsed.get("desired_title")
if desired_title:
roles = _merge_lists(_roles_from_desired_title(desired_title), roles, limit=8)
llm_review_mode = _llm_review_mode()
llm_review_rounds_dbg: List[Dict[str, Any]] = []
llm_review_merge_dbg: List[Dict[str, Any]] = []
llm_review_used = False
llm_review_changed = False
if llm_parse_enabled() and _llm_review_needed(
mode=llm_review_mode,
llm_enriched_used=bool(llm_enriched),
name=name,
roles=roles,
skills=skills,
exp_conf=exp_conf,
english=english,
location=location,
):
for _ in range(_llm_review_rounds()):
review_draft = _build_llm_review_draft(
roles=roles,
skills=skills,
primary_languages=primary_languages,
seniority=seniority,
backend_focus=backend_focus_flag,
exp_years=exp_years,
exp_years_eng=exp_years_eng,
english=english,
location=location,
remote=remote,
sal_min=sal_min,
sal_max=sal_max,
highlights=highlights,
keywords=keywords,
)
review_res, review_dbg = llm_review_profile(
clean,
draft=review_draft,
con=con,
doc_type=dt.doc_type,
sections=sections,
)
llm_review_rounds_dbg.append(review_dbg)
if not review_res:
continue
llm_review_used = True
merged, merge_dbg = _merge_review_result(
review=review_res,
review_dbg=review_dbg,
roles=roles,
skills=skills,
primary_languages=primary_languages,
seniority=seniority,
backend_focus=backend_focus_flag,
remote=remote,
location=location,
english=english,
exp_years=exp_years,
exp_years_eng=exp_years_eng,
exp_conf=exp_conf,
sal_min=sal_min,
sal_max=sal_max,
sal_conf=sal_conf,
highlights=highlights,
keywords=keywords,
llm_summary=llm_summary,
llm_tags=llm_tags,
)
llm_review_merge_dbg.append(merge_dbg)
roles = merged["roles"]
skills = merged["skills"]
primary_languages = merged["primary_languages"]
seniority = merged["seniority"]
backend_focus_flag = merged["backend_focus"]
remote = merged["remote"]
location = merged["location"]
english = merged["english"]
exp_years = merged["exp_years"]
exp_years_eng = merged["exp_years_eng"]
exp_conf = merged["exp_conf"]
sal_min = merged["sal_min"]
sal_max = merged["sal_max"]
sal_conf = merged["sal_conf"]
highlights = merged["highlights"]
keywords = merged["keywords"]
llm_summary = merged["llm_summary"]
llm_tags = merged["llm_tags"]
if merge_dbg.get("changed_fields"):
llm_review_changed = True
else:
break
if llm_review_used:
stats["llm_reviewed"] += 1
if llm_review_changed:
stats["llm_review_changed"] += 1
if "+llm_review" not in parse_method:
parse_method = f"{parse_method}+llm_review"
llm_review_meta = {
"enabled": llm_parse_enabled(),
"mode": llm_review_mode,
"used": llm_review_used,
"changed": llm_review_changed,
"rounds": llm_review_rounds_dbg,
"merge": llm_review_merge_dbg,
}
roles = normalize_roles(roles)
roles = _prune_roles_by_evidence(roles, clean)
skills = normalize_skills(skills)
skills_primary, skills_secondary = split_skills_primary_secondary(
skills,
clean_text=clean,
sections=sections,
)
location = normalize_location(location)
exp_years, exp_years_eng, exp_conf, exp_dbg = _reconcile_experience_fields(
exp_years=exp_years,
exp_years_eng=exp_years_eng,
exp_conf=exp_conf,
exp_dbg=exp_dbg,
positions=position_dicts,
)
if not primary_languages:
language_from_skills = []
for sk in skills:
tok = _norm_lang_token(sk)
if tok:
language_from_skills.append(tok)
primary_languages = _merge_lists(language_from_skills, primary_languages, limit=8)
skills, primary_languages = _drop_false_java(skills, primary_languages, clean)
simh = simhash64(to_fts_text(clean))
candidate_id = _candidate_by_contact(con, contacts) or stable_candidate_id(contacts, name, simh)
_ensure_candidate(con, candidate_id, {
"name": name,
"location": location,
"remote": (1 if remote else 0) if remote is not None else None,
"experience_years": exp_years,
"experience_years_eng": exp_years_eng, # Passed to DB
"experience_confidence": exp_conf if exp_years is not None else None,
"salary_min": sal_min,
"salary_max": sal_max,
"salary_confidence": sal_conf if sal_min is not None else None,
"english_level": english,
"roles": roles,
"skills": skills,
"primary_languages": primary_languages,
"backend_focus": backend_focus_flag,
"roles_norm": norm_pipe(roles),
"skills_norm": norm_pipe(skills),
})
_upsert_contacts(con, candidate_id, contacts)
_upsert_candidate_skills(con, candidate_id, skills_primary, skills_secondary, parse_method)
_upsert_candidate_roles(con, candidate_id, roles, parse_method)
_upsert_candidate_languages(con, candidate_id, english, parse_method)
near = _near_duplicate_active_resume(con, simh, max_dist=max_near_dist)
near_dup_of = near[0] if near else None
if near_dup_of:
stats["near_dup"] += 1
if len(near_dup_examples) < 10:
near_dup_examples.append({
"new_file": file_path,
"dup_of": near_dup_of,
"dist": near[1],
"candidate_id": candidate_id,
})
extraction = {
"name_guess": name,
"contacts": contacts,
"doc_type": {
"type": dt.doc_type,
"confidence": dt.confidence,
"signals": dt.signals,
},
"extract": {
"method": extract_method,
"quality_score": extract_score,
"quality_flags": extract_flags,
"pages": pages[:40],
},
"sections_present": sections_list,
"parse": {
"method": parse_method,
"version": _PARSE_VERSION,
},
"desired_title": desired_title,
"skills_primary": skills_primary,
"skills_secondary": skills_secondary,
"hh_meta": {
"specializations": parsed.get("specializations"),
"employment_type": parsed.get("employment_type"),
"schedule": parsed.get("schedule"),
},
"positions": positions_to_dicts(positions),
"positions_count": len(position_dicts),
"experience": {
"years": exp_years,
"years_engineering": exp_years_eng, # Saved in JSON too
"confidence": exp_conf,
"debug": exp_dbg
},
"salary": {"min": sal_min, "max": sal_max, "confidence": sal_conf, "debug": sal_dbg},
"location_guess": location,
"roles": roles,
"skills": skills,
"primary_languages": primary_languages,
"remote_guess": remote,
"english": english,
"llm_summary": llm_summary,
"llm_tags": llm_tags,
"seniority": seniority,
"backend_focus": backend_focus_flag,
"highlights": highlights,
"keywords": keywords,
"llm": {
"used": bool(llm_enriched),
"debug": llm_dbg,
"data": asdict(llm_enriched) if llm_enriched else None,
"review": llm_review_meta,
},
}
resume_id = _insert_resume(
con=con,
candidate_id=candidate_id,
sha=sha,
simh=simh,
clean_text=clean,
raw_text=raw_text,
extraction_json=json.dumps(extraction, ensure_ascii=False),
llm_summary=llm_summary,
llm_tags=llm_tags,
extract_method=extract_method,
extract_quality_score=extract_score,
extract_quality_flags=json.dumps(extract_flags, ensure_ascii=False),
extract_pages_json=json.dumps(pages[:40], ensure_ascii=False),
doc_type=dt.doc_type,
doc_type_confidence=dt.confidence,
parse_method=parse_method,
parse_version=_PARSE_VERSION,
sections_json=json.dumps(sections, ensure_ascii=False),
file_path=file_path,
mtime=mtime,
size=size,
near_dup_of=near_dup_of,
)
_insert_source(con, resume_id, a)
_insert_positions(con, resume_id, candidate_id, position_dicts)
if sha and size is not None and mtime is not None:
_update_files_seen(con, sha, size, mtime, resume_id)
stats["processed_new"] += 1
if i % commit_every == 0:
con.commit()
log.info(
f"[import] progress {i}/{len(artifacts)} "
f"new={stats['processed_new']} dup_sha={stats['dup_sha']} "
f"near={stats['near_dup']} err={stats['errors']}",
{},
)
except Exception as e:
stats["errors"] += 1
log.error("[import] artifact failed", {"err": repr(e), "artifact": a})
con.commit()
stats["near_dup_examples"] = near_dup_examples
log.info("[import] done", stats)
return stats