from __future__ import annotations import json import os import re import shutil import sqlite3 import subprocess import uuid from dataclasses import asdict from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from tg_resume_db.util import Logger, utc_iso from tg_resume_db.extract.text_extract import extract_text as extract_text_generic from tg_resume_db.extract.clean import normalize_text, to_fts_text from tg_resume_db.extract.pdf_extract import extract_pdf_best from tg_resume_db.extract.llm import ( LLMExtraction, llm_extract_profile, llm_parse_enabled, llm_review_profile, ) from tg_resume_db.extract.doc_type import detect_doc_type from tg_resume_db.extract.sections import split_sections, sections_present from tg_resume_db.extract.experience_timeline import extract_positions, positions_to_dicts from tg_resume_db.extract.parse import ( extract_contacts as extract_contacts_raw, extract_name_guess, extract_remote, extract_english, extract_salary, extract_location_best_effort, extract_experience_years, # Updated function norm_pipe, safe_json, ) from tg_resume_db.extract.templates import generic as tpl_generic from tg_resume_db.extract.templates import hh_ru as tpl_hh from tg_resume_db.extract.templates import linkedin as tpl_linkedin from tg_resume_db.extract.templates import one_page_en as tpl_one_page_en from tg_resume_db.extract.templates import one_page_ru as tpl_one_page_ru from tg_resume_db.extract.templates import pptx_export as tpl_pptx from tg_resume_db.normalize import ( normalize_skills, normalize_roles, split_skills_primary_secondary, normalize_location, ) from tg_resume_db.dedup.simhash import ( sha256_file, sha1_str, simhash64, simhash_bands, hamming64, ) from tg_resume_db.importers.telegram_json import find_result_json, iter_artifacts as iter_json_artifacts from tg_resume_db.importers.telegram_html import find_messages_html, iter_artifacts as iter_html_artifacts from tg_resume_db.importers.file_scan import iter_files as iter_file_scan _PARSE_VERSION = "v3_llm_review" # ----------------------------- # helpers: make everything text # ----------------------------- def coerce_text(x: Any) -> str: """Turn Telegram-export weird structures (dict/list/bytes) into plain text.""" if x is None: return "" if isinstance(x, str): return x if isinstance(x, bytes): for enc in ("utf-8", "utf-16", "cp1251", "latin-1"): try: return x.decode(enc, errors="ignore") except Exception: pass return x.decode("utf-8", errors="ignore") if isinstance(x, list): parts: List[str] = [] for item in x: if isinstance(item, dict): parts.append(coerce_text(item.get("text") or item.get("href") or "")) else: parts.append(coerce_text(item)) return "".join(parts) if isinstance(x, dict): if "text" in x: return coerce_text(x["text"]) if "content" in x: return coerce_text(x["content"]) return json.dumps(x, ensure_ascii=False) return str(x) # ----------------------------- # PDF extraction: prefer pdftotext # ----------------------------- def _which_pdftotext() -> Optional[str]: if os.environ.get("PDFTOTEXT_ENABLE", "0").lower() not in ("1", "true", "yes"): return None exe = shutil.which("pdftotext") or shutil.which("pdftotext.exe") return exe def extract_text_from_pdf_pdftotext(fp: Path, timeout_sec: int = 25) -> str: exe = _which_pdftotext() if not exe: return "" cmd = [exe, "-layout", "-nopgbrk", str(fp), "-"] try: p = subprocess.run( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout_sec, check=False, text=True, encoding="utf-8", errors="ignore", ) return (p.stdout or "").strip() except subprocess.TimeoutExpired: return "" except Exception: return "" def extract_text_resilient(fp: Path, log: Optional[Logger] = None, timeout_sec: int = 25) -> str: ext = fp.suffix.lower() if ext == ".pdf": out = extract_text_from_pdf_pdftotext(fp, timeout_sec=timeout_sec) if out: return out try: return extract_text_generic(fp) or "" except Exception as e: if log: log.warn("[extract] pdf failed - skipped", {"file": str(fp), "err": repr(e)}) return "" try: return extract_text_generic(fp) or "" except Exception as e: if log: log.warn("[extract] file failed - skipped", {"file": str(fp), "err": repr(e)}) return "" # ----------------------------- # contacts normalization + phone/tg cleanup # ----------------------------- _EMAIL_RE = re.compile(r"\b[a-zA-Z0-9._%+\-]{1,64}@[a-zA-Z0-9.\-]{1,253}\.[a-zA-Z]{2,}\b") _EMAIL_SPLIT_RE = re.compile( r"(?[a-z0-9][a-z0-9._%+\-]{1,40})\s+" r"(?P[a-z0-9][a-z0-9._%+\-]{0,40}@[a-z0-9.\-]+\.[a-z]{2,})", re.I, ) _TG_AT_RE = re.compile(r"(? Optional[str]: s = s.strip().lower() if _EMAIL_RE.fullmatch(s): return s return None def _recover_split_emails(text: str) -> List[str]: out: List[str] = [] for m in _EMAIL_SPLIT_RE.finditer(text or ""): prefix = (m.group("prefix") or "").strip().lower().strip(".-_") if not prefix or prefix in _EMAIL_PREFIX_STOP: continue if not re.search(r"[._\-\d]", prefix): continue tail = (m.group("tail") or "").strip().lower() if "@" not in tail: continue local_tail, domain = tail.split("@", 1) local = f"{prefix}{local_tail}" if len(local) > 64: continue cand = f"{local}@{domain}" if _EMAIL_RE.fullmatch(cand): out.append(cand) return out def _prune_fragment_emails(values: List[str]) -> List[str]: uniq = sorted(set(v.lower().strip() for v in values if v and "@" in v)) out: List[str] = [] for e in uniq: local, domain = e.split("@", 1) drop = False for other in uniq: if other == e: continue ol, od = other.split("@", 1) if od != domain: continue if len(local) <= 8 and len(ol) > len(local) + 2 and ol.endswith(local) and re.search(r"[._\-]", ol): drop = True break if not drop: out.append(e) return out def _looks_like_month_range(digits: str) -> bool: if len(digits) == 12: try: mm1 = int(digits[0:2]); yyyy1 = int(digits[2:6]) mm2 = int(digits[6:8]); yyyy2 = int(digits[8:12]) if 1 <= mm1 <= 12 and 1900 <= yyyy1 <= 2100 and 1 <= mm2 <= 12 and 1900 <= yyyy2 <= 2100: return True except Exception: return False return False def _norm_phone(s: str) -> Optional[str]: raw = s.strip() if not (raw.startswith("+") or raw.startswith("7") or raw.startswith("8")): return None digits = re.sub(r"\D+", "", raw) if len(digits) < 10 or len(digits) > 15: return None if len(set(digits)) <= 2: return None if _looks_like_month_range(digits): return None if len(digits) == 12 and digits.startswith(("2", "3", "4", "5", "6", "7", "8", "9")): if digits.count("0") >= 6: return None return "+" + digits def _norm_tg_handle(handle: str) -> Optional[str]: h = handle.strip().lstrip("@").lower() if not (5 <= len(h) <= 32): return None if not re.fullmatch(r"[a-z0-9_]+", h): return None if h.isdigit(): return None if h in _TG_STOP: return None return h def normalize_contacts(raw: Any, clean_text: str) -> Dict[str, List[str]]: out: Dict[str, List[str]] = {"email": [], "phone": [], "tg": [], "github": [], "linkedin": []} if isinstance(raw, dict): key_map = { "emails": "email", "email": "email", "phones": "phone", "phone": "phone", "telegram": "tg", "tg": "tg", "github": "github", "linkedin": "linkedin", } for k, v in raw.items(): nk = key_map.get(k) if not nk: continue vals = [coerce_text(x) for x in v] if isinstance(v, list) else [coerce_text(v)] out[nk].extend(vals) for e in _EMAIL_RE.findall(clean_text): out["email"].append(e) for e in _recover_split_emails(clean_text): out["email"].append(e) for chunk in _PHONE_CHUNK_RE.findall(clean_text): out["phone"].append(chunk) for h in _TG_AT_RE.findall(clean_text): out["tg"].append(h) for h in _TG_LINK_RE.findall(clean_text): out["tg"].append(h) def uniq(seq: List[str]) -> List[str]: seen = set() res = [] for x in seq: if x in seen: continue seen.add(x) res.append(x) return res emails: List[str] = [] for s in out["email"]: n = _norm_email(s) if n: emails.append(n) phones: List[str] = [] for s in out["phone"]: n = _norm_phone(s) if n: phones.append(n) tgs: List[str] = [] for s in out["tg"]: n = _norm_tg_handle(s) if n: tgs.append(n) out["email"] = uniq(_prune_fragment_emails(emails)) out["phone"] = uniq(phones) out["tg"] = uniq(tgs) out["github"] = uniq([coerce_text(x).strip() for x in out["github"] if coerce_text(x).strip()]) out["linkedin"] = uniq([coerce_text(x).strip() for x in out["linkedin"] if coerce_text(x).strip()]) return out # ----------------------------- # LLM helpers # ----------------------------- _LANGUAGE_CANON = { "python", "java", "kotlin", "go", "golang", "c++", "cpp", "c#", "javascript", "typescript", "ruby", "php", "swift", "objective-c", "scala", "rust", "dart", } _LANGUAGE_ALIAS = { "golang": "go", "cpp": "c++", "c plus plus": "c++", "csharp": "c#", "c#": "c#", "js": "javascript", "ts": "typescript", } _JAVA_REAL_RE = re.compile(r"\b(java\s*(8|11|17|21)|spring|jvm|maven|gradle|jakarta)\b", re.I) _JAVASCRIPT_RE = re.compile(r"\b(java\s*script|javascript|js)\b", re.I) def _norm_lang_token(token: str) -> Optional[str]: raw = (token or "").strip().lower() if not raw: return None norm = _LANGUAGE_ALIAS.get(raw, raw) if norm in _LANGUAGE_CANON: # collapse golang -> go, cpp -> c++ if norm == "golang": norm = "go" if norm == "cpp": norm = "c++" return norm return None def _normalize_language_list(values: List[str]) -> List[str]: seen = set() out: List[str] = [] for v in values or []: tok = _norm_lang_token(v) if not tok or tok in seen: continue seen.add(tok) out.append(tok) return out def _drop_false_java( skills: List[str], primary_languages: List[str], clean_text: str, ) -> Tuple[List[str], List[str]]: norm_skills = [str(s).strip().lower() for s in (skills or [])] if "java" not in norm_skills: return skills, primary_languages txt = clean_text or "" has_js = _JAVASCRIPT_RE.search(txt) is not None has_real_java = _JAVA_REAL_RE.search(txt) is not None if has_js and not has_real_java: cleaned_skills = [s for s in skills if str(s).strip().lower() != "java"] cleaned_langs = [s for s in primary_languages if str(s).strip().lower() != "java"] return cleaned_skills, cleaned_langs return skills, primary_languages def _roles_from_desired_title(title: Optional[str]) -> List[str]: if not title: return [] t = title.lower() out: List[str] = [] if "backend" in t or "бэкенд" in t or "бекенд" in t: out.append("backend") if "frontend" in t or "фронтенд" in t: out.append("frontend") if "fullstack" in t or "full stack" in t or "фулстек" in t: out.append("fullstack") if "devops" in t or "sre" in t: out.append("devops") if "qa" in t or "test" in t or "тестировщик" in t: out.append("qa") if "data" in t or "ml" in t or "machine learning" in t or "аналитик" in t: out.append("data") if "android" in t or "ios" in t or "mobile" in t or "мобиль" in t: out.append("mobile") return out def _merge_lists(base: List[str], extra: List[str], limit: Optional[int] = None) -> List[str]: seen = set() out: List[str] = [] for seq in (base or [], extra or []): for x in seq: t = str(x).strip() if not t or t.lower() in seen: continue seen.add(t.lower()) out.append(t) if limit is not None and len(out) >= limit: return out return out def _pick_salary( heur_min: Optional[int], heur_max: Optional[int], heur_conf: Optional[float], llm_min: Optional[int], llm_max: Optional[int], ) -> Tuple[Optional[int], Optional[int], Optional[float]]: if heur_min or heur_max: if heur_conf is None: heur_conf = 0.55 return heur_min, heur_max, heur_conf if llm_min or llm_max: return llm_min, llm_max, 0.65 return heur_min, heur_max, heur_conf _EN_SIGNAL_RE = re.compile(r"\b(english|англий|ielts|toefl|cefr|a1|a2|b1|b2|c1|c2)\b", re.I) def _has_english_signal(text: str) -> bool: if not text: return False return _EN_SIGNAL_RE.search(text) is not None def _can_accept_llm_english(clean_text: str, level: Optional[str]) -> bool: if not level: return False # Require explicit language signal in CV to avoid invented C1/C2. return _has_english_signal(clean_text) _ROLE_EVIDENCE_PATTERNS: Dict[str, re.Pattern] = { "qa": re.compile(r"\b(qa|quality assurance|tester|test engineer|test automation)\b", re.I), "devops": re.compile(r"\b(devops|dev ops|sre|platform engineer|infrastructure engineer)\b", re.I), "mobile": re.compile(r"\b(mobile|android|ios|react native|flutter)\b", re.I), "data": re.compile(r"\b(data engineer|data scientist|ml engineer|machine learning)\b", re.I), "architect": re.compile(r"\b(architect|solution architect|software architect)\b", re.I), } def _prune_roles_by_evidence(roles: List[str], clean_text: str) -> List[str]: out: List[str] = [] seen = set() t = (clean_text or "").lower() for role in roles or []: r = str(role).strip().lower() if not r or r in seen: continue seen.add(r) pat = _ROLE_EVIDENCE_PATTERNS.get(r) if pat is not None and not pat.search(t): continue out.append(r) return out def _parse_ym(date_iso: Optional[str]) -> Optional[Tuple[int, int]]: if not date_iso: return None m = re.match(r"^\s*(\d{4})-(\d{2})", str(date_iso).strip()) if not m: return None y = int(m.group(1)) mm = int(m.group(2)) if not (1900 <= y <= 2100 and 1 <= mm <= 12): return None return (y, mm) def _months_between(a: Tuple[int, int], b: Tuple[int, int]) -> int: return (b[0] - a[0]) * 12 + (b[1] - a[1]) def _experience_years_from_positions(position_dicts: List[Dict[str, Any]]) -> Optional[float]: intervals: List[Tuple[Tuple[int, int], Tuple[int, int]]] = [] for p in position_dicts or []: if not isinstance(p, dict): continue a = _parse_ym(p.get("date_from")) b = _parse_ym(p.get("date_to")) if not a or not b: continue if b < a: a, b = b, a intervals.append((a, b)) if not intervals: return None intervals.sort(key=lambda x: x[0]) merged: List[Tuple[Tuple[int, int], Tuple[int, int]]] = [intervals[0]] for s, e in intervals[1:]: ls, le = merged[-1] if s <= le: if e > le: merged[-1] = (ls, e) else: merged.append((s, e)) months = 0 for s, e in merged: months += max(0, _months_between(s, e)) years = round(months / 12.0, 2) if 0.0 <= years <= 60.0: return years return None def _reconcile_experience_fields( *, exp_years: Optional[float], exp_years_eng: Optional[float], exp_conf: Optional[float], exp_dbg: Dict[str, Any], positions: List[Dict[str, Any]], ) -> Tuple[Optional[float], Optional[float], Optional[float], Dict[str, Any]]: dbg = dict(exp_dbg or {}) source_notes: List[str] = [] pos_years = _experience_years_from_positions(positions) if pos_years is not None: dbg["positions_years"] = pos_years if exp_years is None and pos_years is not None: exp_years = pos_years exp_conf = max(float(exp_conf or 0.0), 0.74) source_notes.append("positions_fallback") elif exp_years is not None and pos_years is not None and pos_years > (float(exp_years) + 1.0): method = str(dbg.get("method") or "") strong_summary = method in ("summary", "header_chunk") and float(exp_conf or 0.0) >= 0.78 if strong_summary and (pos_years - float(exp_years)) > 1.5: source_notes.append("positions_reconcile_skip_strong_summary") else: exp_years = pos_years exp_conf = max(float(exp_conf or 0.0), 0.75) source_notes.append("positions_reconcile_up") # Prevent impossible split like total=1.5 while engineering=7.0. try: if exp_years is not None and exp_years_eng is not None: if float(exp_years) < float(exp_years_eng) * 0.7: exp_years = float(exp_years_eng) exp_conf = max(float(exp_conf or 0.0), 0.74) source_notes.append("eng_gt_total_fix") except Exception: pass is_recruiter = bool(dbg.get("is_recruiter")) if exp_years_eng is None and exp_years is not None and not is_recruiter: exp_years_eng = float(exp_years) source_notes.append("eng_fill_from_total") if source_notes: dbg["reconcile"] = source_notes return exp_years, exp_years_eng, exp_conf, dbg def _prefer_explicit_summary_experience( *, clean_text: str, exp_years: Optional[float], exp_years_eng: Optional[float], exp_conf: Optional[float], exp_dbg: Dict[str, Any], ) -> Tuple[Optional[float], Optional[float], Optional[float], Dict[str, Any]]: try: clean_total, clean_eng, clean_conf, clean_dbg = extract_experience_years(clean_text or "") except Exception: return exp_years, exp_years_eng, exp_conf, exp_dbg if clean_total is None: return exp_years, exp_years_eng, exp_conf, exp_dbg if exp_years is None: merged_dbg = dict(exp_dbg or {}) merged_dbg["clean_exp_method"] = (clean_dbg or {}).get("method") return clean_total, (clean_eng if clean_eng is not None else exp_years_eng), max(float(exp_conf or 0.0), float(clean_conf or 0.0)), merged_dbg parsed_method = str((exp_dbg or {}).get("method") or "") clean_method = str((clean_dbg or {}).get("method") or "") if clean_conf is not None and clean_conf >= 0.78 and clean_method in ("summary", "header_chunk"): try: if parsed_method.startswith("timeline") and float(clean_total) + 1.5 < float(exp_years): merged_dbg = dict(exp_dbg or {}) merged_dbg["clean_exp_method"] = clean_method merged_dbg["reconcile_clean"] = "prefer_explicit_summary" return clean_total, (clean_eng if clean_eng is not None else exp_years_eng), max(float(exp_conf or 0.0), float(clean_conf or 0.0)), merged_dbg except Exception: pass return exp_years, exp_years_eng, exp_conf, exp_dbg def _need_llm_fallback( *, roles: List[str], skills: List[str], exp_conf: Optional[float], english: Optional[str], location: Optional[str], name: Optional[str], doc_type: Optional[str], ) -> bool: if doc_type == "scan_pdf": return False if not name: return True if not roles and len(skills) < 2: return True if exp_conf is None or exp_conf < 0.4: return True if not english and not location and len(skills) < 2: return True return False def _maybe_llm_enrich( *, con: sqlite3.Connection, clean: str, roles: List[str], skills: List[str], exp_conf: Optional[float], english: Optional[str], location: Optional[str], name: Optional[str], doc_type: Optional[str], sections: Optional[Dict[str, str]], ) -> Tuple[Optional[LLMExtraction], Dict[str, Any]]: """ LLM runs only as fallback when heuristics are weak, unless forced via LLM_PARSE_FORCE=1. """ if not llm_parse_enabled(): return None, {"enabled": False} forced = os.environ.get("LLM_PARSE_FORCE", "0").lower() in ("1", "true", "yes") if not forced and not _need_llm_fallback( roles=roles, skills=skills, exp_conf=exp_conf, english=english, location=location, name=name, doc_type=doc_type, ): return None, {"enabled": True, "forced": False, "used": False, "reason": "heuristics_ok"} llm_res, llm_dbg = llm_extract_profile( clean, con=con, doc_type=doc_type, sections=sections, ) if isinstance(llm_dbg, dict): llm_dbg["forced"] = forced llm_dbg["used"] = bool(llm_res) return llm_res, llm_dbg _EN_ORDER = {"A1": 1, "A2": 2, "B1": 3, "B2": 4, "C1": 5, "C2": 6} def _llm_review_mode() -> str: mode = (os.environ.get("LLM_PARSE_REVIEW_MODE", "always") or "").strip().lower() if mode in ("0", "false", "no", "off"): return "off" if mode in ("auto", "smart", "on_demand"): return "auto" return "always" def _llm_review_rounds() -> int: raw = (os.environ.get("LLM_PARSE_REVIEW_ROUNDS", "1") or "").strip() try: rounds = int(raw) except Exception: rounds = 1 return max(1, min(rounds, 3)) def _normalize_cefr(level: Optional[str]) -> Optional[str]: if not level: return None m = re.search(r"\b(A1|A2|B1|B2|C1|C2)\b", str(level).upper()) return m.group(1) if m else None def _bounded_float(v: Any, lo: float, hi: float) -> Optional[float]: try: x = float(v) except Exception: return None if x < lo or x > hi: return None return float(round(x, 2)) def _bounded_int(v: Any, lo: int, hi: int) -> Optional[int]: try: x = int(float(v)) except Exception: return None if x < lo or x > hi: return None return x def _llm_review_needed( *, mode: str, llm_enriched_used: bool, name: Optional[str], roles: List[str], skills: List[str], exp_conf: Optional[float], english: Optional[str], location: Optional[str], ) -> bool: if mode == "off": return False if mode == "always": return True if llm_enriched_used: return True if not name: return True if not roles or len(skills) < 3: return True if exp_conf is None or exp_conf < 0.65: return True if not english or not location: return True return False def _build_llm_review_draft( *, roles: List[str], skills: List[str], primary_languages: List[str], seniority: Optional[str], backend_focus: Optional[bool], exp_years: Optional[float], exp_years_eng: Optional[float], english: Optional[str], location: Optional[str], remote: Optional[bool], sal_min: Optional[int], sal_max: Optional[int], highlights: List[str], keywords: List[str], ) -> Dict[str, Any]: return { "roles": roles[:12], "skills": skills[:64], "primary_languages": primary_languages[:12], "seniority": seniority, "backend_focus": backend_focus, "experience_years_total": exp_years, "experience_years_engineering": exp_years_eng, "english_level": _normalize_cefr(english), "location": location, "remote_ok": remote, "salary_min_rub": sal_min, "salary_max_rub": sal_max, "highlights": highlights[:8], "keywords": keywords[:40], } def _merge_review_result( *, review: LLMExtraction, review_dbg: Dict[str, Any], roles: List[str], skills: List[str], primary_languages: List[str], seniority: Optional[str], backend_focus: Optional[bool], remote: Optional[bool], location: Optional[str], english: Optional[str], exp_years: Optional[float], exp_years_eng: Optional[float], exp_conf: Optional[float], sal_min: Optional[int], sal_max: Optional[int], sal_conf: Optional[float], highlights: List[str], keywords: List[str], llm_summary: Optional[str], llm_tags: List[str], ) -> Tuple[Dict[str, Any], Dict[str, Any]]: quality = review_dbg.get("quality_score") try: quality_f = float(quality) if quality is not None else None except Exception: quality_f = None trusted = quality_f is None or quality_f >= 0.55 changed: List[str] = [] model_changed_raw = review_dbg.get("model_changed_fields") or [] model_changed = set() if isinstance(model_changed_raw, list): for x in model_changed_raw: s = str(x).strip() if s: model_changed.add(s) roles_out = list(roles or []) if review.roles: if trusted and "roles" in model_changed: merged_roles = _merge_lists(review.roles, [], limit=12) else: merged_roles = _merge_lists(review.roles, roles_out, limit=12) if trusted else _merge_lists(roles_out, review.roles, limit=12) if merged_roles != roles_out: changed.append("roles") roles_out = merged_roles skills_out = list(skills or []) if review.skills: merged_skills = _merge_lists(review.skills, skills_out, limit=64) if trusted else _merge_lists(skills_out, review.skills, limit=64) if merged_skills != skills_out: changed.append("skills") skills_out = merged_skills langs_out = list(primary_languages or []) review_langs = _normalize_language_list(review.primary_languages) if review_langs: if trusted and "primary_languages" in model_changed: merged_langs = _merge_lists(review_langs, [], limit=12) else: merged_langs = _merge_lists(review_langs, langs_out, limit=12) if trusted else _merge_lists(langs_out, review_langs, limit=12) if merged_langs != langs_out: changed.append("primary_languages") langs_out = merged_langs seniority_out = seniority if review.seniority and (trusted or not seniority_out): if review.seniority != seniority_out: changed.append("seniority") seniority_out = review.seniority backend_focus_out = backend_focus if review.backend_focus is not None and (trusted or backend_focus_out is None): if review.backend_focus != backend_focus_out: changed.append("backend_focus") backend_focus_out = review.backend_focus remote_out = remote if review.remote_ok is not None and (trusted or remote_out is None): if review.remote_ok != remote_out: changed.append("remote") remote_out = review.remote_ok location_out = location if review.location and (trusted or not location_out): loc = review.location.strip() if 2 <= len(loc) <= 120 and loc != (location_out or ""): changed.append("location") location_out = loc english_out = _normalize_cefr(english) review_english = _normalize_cefr(review.english_level) if review_english: if english_out is None: english_out = review_english changed.append("english") elif trusted and _EN_ORDER.get(review_english, 0) > _EN_ORDER.get(english_out, 0): english_out = review_english changed.append("english") exp_years_out = exp_years exp_years_eng_out = exp_years_eng exp_conf_out = exp_conf review_exp_total = _bounded_float(review.experience_years_total, 0.0, 60.0) review_exp_eng = _bounded_float(review.experience_years_engineering, 0.0, 60.0) if review_exp_total is not None: if exp_years_out is None or (trusted and ((exp_conf_out or 0.0) < 0.75)): if exp_years_out != review_exp_total: changed.append("experience_years_total") exp_years_out = review_exp_total exp_conf_out = max(float(exp_conf_out or 0.0), 0.78 if trusted else 0.65) if review_exp_eng is not None: if exp_years_eng_out is None or trusted: if exp_years_eng_out != review_exp_eng: changed.append("experience_years_engineering") exp_years_eng_out = review_exp_eng exp_conf_out = max(float(exp_conf_out or 0.0), 0.74 if trusted else 0.62) sal_min_out = sal_min sal_max_out = sal_max sal_conf_out = sal_conf cand_min = _bounded_int(review.salary_min_rub, 10_000, 200_000_000) cand_max = _bounded_int(review.salary_max_rub, 10_000, 200_000_000) if cand_min is None and cand_max is None: cand_min = _bounded_int(review.salary_min_usd, 100, 2_000_000) cand_max = _bounded_int(review.salary_max_usd, 100, 2_000_000) if cand_min is not None or cand_max is not None: if cand_min is not None and cand_max is not None and cand_min > cand_max: cand_min, cand_max = cand_max, cand_min if (sal_min_out is None and sal_max_out is None) or (trusted and (sal_conf_out is None or sal_conf_out < 0.75)): if cand_min is not None and cand_min != sal_min_out: sal_min_out = cand_min changed.append("salary") if cand_max is not None and cand_max != sal_max_out: sal_max_out = cand_max changed.append("salary") sal_conf_out = max(float(sal_conf_out or 0.0), 0.72 if trusted else 0.60) highlights_out = list(highlights or []) if review.highlights: merged_highlights = _merge_lists(review.highlights, highlights_out, limit=8) if trusted else _merge_lists(highlights_out, review.highlights, limit=8) if merged_highlights != highlights_out: highlights_out = merged_highlights changed.append("highlights") keywords_out = list(keywords or []) if review.keywords: merged_keywords = _merge_lists(review.keywords, keywords_out, limit=40) if trusted else _merge_lists(keywords_out, review.keywords, limit=40) if merged_keywords != keywords_out: keywords_out = merged_keywords changed.append("keywords") llm_tags_out = list(llm_tags or []) llm_tags_out = _merge_lists(keywords_out, llm_tags_out, limit=40) llm_tags_out = _merge_lists(skills_out, llm_tags_out, limit=40) llm_tags_out = _merge_lists(langs_out, llm_tags_out, limit=40) llm_summary_out = llm_summary if highlights_out: merged_summary = "; ".join([h.strip() for h in highlights_out if h.strip()])[:800] if merged_summary and merged_summary != (llm_summary_out or ""): llm_summary_out = merged_summary changed.append("llm_summary") changed_uniq = [] changed_seen = set() for item in changed: if item in changed_seen: continue changed_seen.add(item) changed_uniq.append(item) return ( { "roles": roles_out, "skills": skills_out, "primary_languages": langs_out, "seniority": seniority_out, "backend_focus": backend_focus_out, "remote": remote_out, "location": location_out, "english": english_out, "exp_years": exp_years_out, "exp_years_eng": exp_years_eng_out, "exp_conf": exp_conf_out, "sal_min": sal_min_out, "sal_max": sal_max_out, "sal_conf": sal_conf_out, "highlights": highlights_out, "keywords": keywords_out, "llm_summary": llm_summary_out, "llm_tags": llm_tags_out, }, { "trusted": trusted, "quality_score": quality_f, "changed_fields": changed_uniq, "issues_found": review_dbg.get("issues_found") or [], "model_changed_fields": review_dbg.get("changed_fields") or [], }, ) # ----------------------------- # candidate/resume DB helpers # ----------------------------- def stable_candidate_id(contacts: Dict[str, List[str]], name: Optional[str], simh: int) -> str: if contacts.get("email"): return "cand_" + sha1_str("email:" + contacts["email"][0]) if contacts.get("phone"): return "cand_" + sha1_str("phone:" + contacts["phone"][0]) if contacts.get("tg"): return "cand_" + sha1_str("tg:" + contacts["tg"][0]) if contacts.get("github"): return "cand_" + sha1_str("gh:" + contacts["github"][0]) if contacts.get("linkedin"): return "cand_" + sha1_str("li:" + contacts["linkedin"][0]) base = (name or "unknown").strip().lower() return "cand_" + sha1_str(f"name:{base}:{simh}") def _candidate_by_contact(con: sqlite3.Connection, contacts: Dict[str, List[str]]) -> Optional[str]: checks = [ ("email", contacts.get("email", [])), ("phone", contacts.get("phone", [])), ("tg", contacts.get("tg", [])), ("github", contacts.get("github", [])), ("linkedin", contacts.get("linkedin", [])), ] for ctype, vals in checks: for v in vals: row = con.execute( "SELECT candidate_id FROM candidate_contacts WHERE contact_type=? AND contact_value=?", (ctype, v), ).fetchone() if row: return row["candidate_id"] return None def _upsert_contacts(con: sqlite3.Connection, candidate_id: str, contacts: Dict[str, List[str]]) -> None: pairs: List[Tuple[str, str]] = [] for e in contacts.get("email", []): pairs.append(("email", e)) for p in contacts.get("phone", []): pairs.append(("phone", p)) for t in contacts.get("tg", []): pairs.append(("tg", t)) for g in contacts.get("github", []): pairs.append(("github", g)) for l in contacts.get("linkedin", []): pairs.append(("linkedin", l)) for ctype, val in pairs: con.execute( "INSERT OR IGNORE INTO candidate_contacts(contact_type, contact_value, candidate_id) VALUES (?,?,?)", (ctype, val, candidate_id), ) def _upsert_candidate_skills( con: sqlite3.Connection, candidate_id: str, skills_primary: List[str], skills_secondary: List[str], source: str, ) -> None: for sk in skills_primary: con.execute( """INSERT OR REPLACE INTO candidate_skills(candidate_id, skill_id, skill_label, confidence, source, evidence) VALUES (?,?,?,?,?,?)""", (candidate_id, sk, sk, 0.90, source, "skills_primary"), ) for sk in skills_secondary: con.execute( """INSERT OR REPLACE INTO candidate_skills(candidate_id, skill_id, skill_label, confidence, source, evidence) VALUES (?,?,?,?,?,?)""", (candidate_id, sk, sk, 0.60, source, "skills_secondary"), ) def _upsert_candidate_roles( con: sqlite3.Connection, candidate_id: str, roles: List[str], source: str, ) -> None: for r in roles: con.execute( """INSERT OR REPLACE INTO candidate_roles(candidate_id, role, confidence, source, evidence) VALUES (?,?,?,?,?)""", (candidate_id, r, 0.80, source, "roles"), ) def _upsert_candidate_languages( con: sqlite3.Connection, candidate_id: str, english_level: Optional[str], source: str, ) -> None: if not english_level: return con.execute( """INSERT OR REPLACE INTO candidate_languages(candidate_id, language, level, confidence, source, evidence) VALUES (?,?,?,?,?,?)""", (candidate_id, "english", english_level, 0.75, source, "english_level"), ) def _ensure_candidate(con: sqlite3.Connection, candidate_id: str, fields: Dict[str, Any]) -> None: # Attempt to ensure the new column exists if migration didn't run try: con.execute("ALTER TABLE candidates ADD COLUMN experience_years_eng REAL") except Exception: pass # Column likely exists or basic sqlite error, proceed to insert try: con.execute("ALTER TABLE candidates ADD COLUMN primary_languages_json TEXT") except Exception: pass try: con.execute("ALTER TABLE candidates ADD COLUMN backend_focus INTEGER") except Exception: pass exists = con.execute("SELECT 1 FROM candidates WHERE candidate_id=?", (candidate_id,)).fetchone() is not None primary_languages_json = safe_json(fields.get("primary_languages", [])) backend_focus_field = fields.get("backend_focus") backend_focus_int = None if backend_focus_field is None else (1 if backend_focus_field else 0) if not exists: con.execute( """INSERT INTO candidates( candidate_id, name, location, remote, experience_years, experience_years_eng, experience_confidence, salary_min, salary_max, salary_confidence, english_level, roles_json, skills_json, primary_languages_json, roles_norm, skills_norm, backend_focus, created_at, updated_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( candidate_id, fields.get("name"), fields.get("location"), fields.get("remote"), fields.get("experience_years"), fields.get("experience_years_eng"), # new field fields.get("experience_confidence"), fields.get("salary_min"), fields.get("salary_max"), fields.get("salary_confidence"), fields.get("english_level"), safe_json(fields.get("roles", [])), safe_json(fields.get("skills", [])), primary_languages_json, fields.get("roles_norm") or "|", fields.get("skills_norm") or "|", backend_focus_int, utc_iso(), utc_iso(), ), ) else: con.execute( """UPDATE candidates SET name = COALESCE(?, name), location = COALESCE(?, location), remote = COALESCE(?, remote), experience_years = COALESCE(?, experience_years), experience_years_eng = COALESCE(?, experience_years_eng), experience_confidence = COALESCE(?, experience_confidence), salary_min = COALESCE(?, salary_min), salary_max = COALESCE(?, salary_max), salary_confidence = COALESCE(?, salary_confidence), english_level = COALESCE(?, english_level), roles_json = CASE WHEN ? IS NOT NULL AND ? != '[]' THEN ? ELSE roles_json END, skills_json = CASE WHEN ? IS NOT NULL AND ? != '[]' THEN ? ELSE skills_json END, primary_languages_json = CASE WHEN ? IS NOT NULL AND ? != '[]' THEN ? ELSE primary_languages_json END, roles_norm = CASE WHEN ? != '|' THEN ? ELSE roles_norm END, skills_norm = CASE WHEN ? != '|' THEN ? ELSE skills_norm END, backend_focus = COALESCE(?, backend_focus), updated_at = ? WHERE candidate_id = ?""", ( fields.get("name"), fields.get("location"), fields.get("remote"), fields.get("experience_years"), fields.get("experience_years_eng"), # new field update fields.get("experience_confidence"), fields.get("salary_min"), fields.get("salary_max"), fields.get("salary_confidence"), fields.get("english_level"), safe_json(fields.get("roles", [])), safe_json(fields.get("roles", [])), safe_json(fields.get("roles", [])), safe_json(fields.get("skills", [])), safe_json(fields.get("skills", [])), safe_json(fields.get("skills", [])), primary_languages_json, primary_languages_json, primary_languages_json, fields.get("roles_norm") or "|", fields.get("roles_norm") or "|", fields.get("skills_norm") or "|", fields.get("skills_norm") or "|", backend_focus_int, utc_iso(), candidate_id, ), ) def _resume_by_sha(con: sqlite3.Connection, sha: str) -> Optional[str]: row = con.execute("SELECT resume_id FROM resumes WHERE sha256=?", (sha,)).fetchone() return row["resume_id"] if row else None def _near_duplicate_active_resume(con: sqlite3.Connection, simh: int, max_dist: int) -> Optional[Tuple[str, int]]: candidate_resume_ids = set() for bucket, band in simhash_bands(simh): cur = con.execute("SELECT resume_id FROM simhash_buckets WHERE bucket=? AND band=?", (bucket, band)) for r in cur.fetchall(): candidate_resume_ids.add(r["resume_id"]) best: Optional[Tuple[str, int]] = None for rid in candidate_resume_ids: row = con.execute("SELECT simhash FROM resumes WHERE resume_id=? AND is_active=1", (rid,)).fetchone() if not row or row["simhash"] is None: continue try: old = int(str(row["simhash"]), 16) except Exception: continue dist = hamming64(old, simh) if dist <= max_dist: if best is None or dist < best[1]: best = (rid, dist) return best def _insert_resume( con: sqlite3.Connection, candidate_id: str, sha: Optional[str], simh: int, clean_text: str, raw_text: str, extraction_json: str, llm_summary: Optional[str], llm_tags: List[str], extract_method: Optional[str], extract_quality_score: Optional[float], extract_quality_flags: Optional[str], extract_pages_json: Optional[str], doc_type: Optional[str], doc_type_confidence: Optional[float], parse_method: Optional[str], parse_version: Optional[str], sections_json: Optional[str], file_path: Optional[str], mtime: Optional[int], size: Optional[int], near_dup_of: Optional[str], ) -> str: resume_id = "res_" + uuid.uuid4().hex if near_dup_of: con.execute("UPDATE resumes SET is_active=0 WHERE resume_id=?", (near_dup_of,)) con.execute( """INSERT INTO resumes( resume_id, candidate_id, sha256, simhash, clean_text, raw_text, extraction_json, llm_summary, llm_tags_json, extract_method, extract_quality_score, extract_quality_flags, extract_pages_json, doc_type, doc_type_confidence, parse_method, parse_version, sections_json, is_active, duplicate_of_resume_id, file_path, file_mtime, file_size, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( resume_id, candidate_id, sha, f"{simh:016x}", clean_text, raw_text[:250000], extraction_json, llm_summary, safe_json(llm_tags), extract_method, extract_quality_score, extract_quality_flags, extract_pages_json, doc_type, doc_type_confidence, parse_method, parse_version, sections_json, 1, near_dup_of, file_path, mtime, size, utc_iso(), ), ) for bucket, band in simhash_bands(simh): con.execute( "INSERT OR IGNORE INTO simhash_buckets(bucket, band, resume_id) VALUES (?,?,?)", (bucket, band, resume_id), ) return resume_id def _insert_source(con: sqlite3.Connection, resume_id: str, src: Dict[str, Any]) -> None: con.execute( """INSERT INTO sources( resume_id, export_path, chat_title, message_id, message_date, origin_type, original_file_path, original_file_name, extra_json ) VALUES (?,?,?,?,?,?,?,?,?)""", ( resume_id, src.get("export_path"), src.get("chat_title"), src.get("message_id"), src.get("message_date"), src.get("origin_type"), src.get("file_path"), src.get("original_name"), json.dumps(src.get("extra", {}), ensure_ascii=False), ), ) def _insert_positions( con: sqlite3.Connection, resume_id: str, candidate_id: str, positions: List[Dict[str, Any]], ) -> None: if not positions: return for p in positions: pos_id = "pos_" + uuid.uuid4().hex con.execute( """INSERT INTO positions( position_id, resume_id, candidate_id, title, company, date_from, date_to, is_current, description, stack_json ) VALUES (?,?,?,?,?,?,?,?,?,?)""", ( pos_id, resume_id, candidate_id, p.get("title"), p.get("company"), p.get("date_from"), p.get("date_to"), 1 if p.get("is_current") else 0 if p.get("is_current") is not None else None, p.get("description"), json.dumps(p.get("stack") or [], ensure_ascii=False), ), ) def _update_files_seen(con: sqlite3.Connection, sha: str, size: int, mtime: int, canonical_resume_id: str) -> None: con.execute( """INSERT INTO files_seen(sha256, size, mtime, canonical_resume_id, first_seen_at, last_seen_at) VALUES (?,?,?,?,?,?) ON CONFLICT(sha256) DO UPDATE SET size=excluded.size, mtime=excluded.mtime, canonical_resume_id=excluded.canonical_resume_id, last_seen_at=excluded.last_seen_at """, (sha, size, mtime, canonical_resume_id, utc_iso(), utc_iso()), ) # ----------------------------- # artifacts collection # ----------------------------- def collect_artifacts(input_root: Path) -> List[Dict[str, Any]]: artifacts: List[Dict[str, Any]] = [] for rj in find_result_json(input_root): artifacts.extend(list(iter_json_artifacts(rj))) for mh in find_messages_html(input_root): artifacts.extend(list(iter_html_artifacts(mh))) artifacts.extend(list(iter_file_scan(input_root))) return artifacts # ----------------------------- # main pipeline # ----------------------------- def import_exports( con: sqlite3.Connection, input_dir: str, log: Logger, max_near_dist: int = 6, min_text_len: int = 250, commit_every: int = 20, ) -> Dict[str, Any]: root = Path(input_dir).resolve() if not root.exists(): raise SystemExit(f"Input not found: {root}") artifacts = collect_artifacts(root) log.info(f"[import] artifacts found: {len(artifacts)}", {"input": str(root)}) stats: Dict[str, Any] = { "input": str(root), "artifacts": len(artifacts), "processed_new": 0, "dup_sha": 0, "near_dup": 0, "short_or_empty": 0, "errors": 0, "sources_added_only": 0, "llm_enriched": 0, "llm_reviewed": 0, "llm_review_changed": 0, } near_dup_examples: List[Dict[str, Any]] = [] for i, a in enumerate(artifacts, start=1): try: raw_text = "" sha = None file_path = a.get("file_path") size = None mtime = None extract_method = None extract_score = None extract_flags: List[str] = [] pages: List[Dict[str, Any]] = [] if file_path: fp = Path(file_path) if not fp.exists(): continue st = fp.stat() size = int(st.st_size) mtime = int(st.st_mtime) sha = sha256_file(str(fp)) existing_resume = _resume_by_sha(con, sha) if existing_resume: stats["dup_sha"] += 1 _insert_source(con, existing_resume, a) _update_files_seen(con, sha, size, mtime, existing_resume) stats["sources_added_only"] += 1 continue if fp.suffix.lower() == ".pdf": pdf_res = extract_pdf_best(fp, timeout_sec=25) raw_text = pdf_res.text extract_method = pdf_res.method extract_score = pdf_res.score extract_flags = pdf_res.flags pages = pdf_res.pages else: try: raw_text = extract_text_generic(fp) or "" except Exception as e: if log: log.warn("[extract] file failed - skipped", {"file": str(fp), "err": repr(e)}) raw_text = "" extract_method = f"file_{fp.suffix.lower().lstrip('.') or 'unknown'}" else: raw_text = a.get("message_text") or "" extract_method = "telegram_post" raw_text = coerce_text(raw_text) if not raw_text or len(raw_text.strip()) < min_text_len: stats["short_or_empty"] += 1 continue clean = normalize_text(raw_text) if not clean or len(clean) < min_text_len: stats["short_or_empty"] += 1 continue file_ext = Path(file_path).suffix.lower() if file_path else None dt = detect_doc_type(clean, file_ext=file_ext) if a.get("origin_type") == "message_text": from tg_resume_db.extract.doc_type import DocTypeResult dt = DocTypeResult(doc_type="telegram_post", confidence=0.92, signals=["telegram_message"]) if extract_flags and "scan_like" in extract_flags: from tg_resume_db.extract.doc_type import DocTypeResult dt = DocTypeResult(doc_type="scan_pdf", confidence=0.9, signals=dt.signals + ["scan_like"]) sections = split_sections(clean, dt.doc_type) sections_list = sections_present(sections) exp_section_text = sections.get("experience") if isinstance(sections, dict) else None positions = extract_positions(exp_section_text or clean) position_dicts = positions_to_dicts(positions) parser = tpl_generic if dt.confidence >= 0.8: if dt.doc_type == "hh_ru": parser = tpl_hh elif dt.doc_type == "linkedin_pdf": parser = tpl_linkedin elif dt.doc_type == "one_page_en": parser = tpl_one_page_en elif dt.doc_type == "one_page_ru": parser = tpl_one_page_ru elif dt.doc_type == "pptx_export": parser = tpl_pptx parsed = parser.parse_resume(clean, sections) parse_method = parsed.get("parse_method") or "generic_heur" contacts_raw = parsed.get("contacts_raw") or extract_contacts_raw(clean) contacts = normalize_contacts(contacts_raw, clean) name = parsed.get("name") or extract_name_guess(clean) remote = parsed.get("remote") if remote is None: remote = extract_remote(clean) english = parsed.get("english") or extract_english(clean) roles = parsed.get("roles") or [] skills = parsed.get("skills") or [] primary_languages: List[str] = [] location = parsed.get("location") or extract_location_best_effort(clean) exp_years = parsed.get("exp_years") exp_years_eng = parsed.get("exp_years_eng") exp_conf = parsed.get("exp_conf") exp_dbg = parsed.get("exp_dbg") or {} if exp_years is None and exp_years_eng is None: exp_years, exp_years_eng, exp_conf, exp_dbg = extract_experience_years(clean) exp_years, exp_years_eng, exp_conf, exp_dbg = _prefer_explicit_summary_experience( clean_text=clean, exp_years=exp_years, exp_years_eng=exp_years_eng, exp_conf=exp_conf, exp_dbg=exp_dbg, ) exp_years, exp_years_eng, exp_conf, exp_dbg = _reconcile_experience_fields( exp_years=exp_years, exp_years_eng=exp_years_eng, exp_conf=exp_conf, exp_dbg=exp_dbg, positions=position_dicts, ) sal_min = parsed.get("salary_min") sal_max = parsed.get("salary_max") sal_conf = parsed.get("salary_conf") sal_dbg = parsed.get("salary_dbg") or {} if sal_min is None and sal_max is None: sal_min, sal_max, sal_conf, sal_dbg = extract_salary(clean) llm_summary: Optional[str] = None llm_tags: List[str] = [] seniority: Optional[str] = None highlights: List[str] = [] keywords: List[str] = [] llm_enriched, llm_dbg = _maybe_llm_enrich( con=con, clean=clean, roles=roles, skills=skills, exp_conf=exp_conf, english=english, location=location, name=name, doc_type=dt.doc_type, sections=sections, ) backend_focus_flag: Optional[bool] = None if llm_enriched: parse_method = "llm_rag" stats["llm_enriched"] += 1 roles = _merge_lists(llm_enriched.roles, roles, limit=8) normalized_llm_langs = _normalize_language_list(llm_enriched.primary_languages) if normalized_llm_langs: primary_languages = _merge_lists(normalized_llm_langs, primary_languages, limit=8) skills = _merge_lists(normalized_llm_langs, skills, limit=48) skills = _merge_lists(llm_enriched.skills, skills, limit=48) if remote is None and llm_enriched.remote_ok is not None: remote = llm_enriched.remote_ok if not location and llm_enriched.location: location = llm_enriched.location if not english and llm_enriched.english_level and _can_accept_llm_english(clean, llm_enriched.english_level): english = llm_enriched.english_level backend_focus_flag = llm_enriched.backend_focus if llm_enriched.backend_focus is True: roles = _merge_lists(["backend"], roles, limit=8) elif llm_enriched.backend_focus is False: pruned_roles: List[str] = [] seen_roles = set() for r in roles: if r.lower() == "backend": continue rl = r.lower() if rl in seen_roles: continue seen_roles.add(rl) pruned_roles.append(r) roles = pruned_roles if (exp_conf is None or exp_conf < 0.6) and llm_enriched.experience_years_total is not None: exp_years = llm_enriched.experience_years_total exp_conf = 0.65 if llm_enriched.experience_years_engineering is not None: exp_years_eng = llm_enriched.experience_years_engineering sal_min, sal_max, sal_conf = _pick_salary( sal_min, sal_max, sal_conf, llm_enriched.salary_min_rub, llm_enriched.salary_max_rub ) if sal_min is None and sal_max is None: sal_min, sal_max, sal_conf = _pick_salary( sal_min, sal_max, sal_conf, llm_enriched.salary_min_usd, llm_enriched.salary_max_usd ) seniority = llm_enriched.seniority highlights = [h.strip() for h in llm_enriched.highlights if h.strip()] if highlights: llm_summary = "; ".join(highlights)[:800] keywords = _merge_lists(llm_enriched.keywords, keywords, limit=40) llm_tags = _merge_lists(llm_enriched.keywords, llm_tags, limit=24) llm_tags = _merge_lists(llm_enriched.skills, llm_tags, limit=24) llm_tags = _merge_lists(llm_enriched.primary_languages, llm_tags, limit=24) desired_title = parsed.get("desired_title") if desired_title: roles = _merge_lists(_roles_from_desired_title(desired_title), roles, limit=8) llm_review_mode = _llm_review_mode() llm_review_rounds_dbg: List[Dict[str, Any]] = [] llm_review_merge_dbg: List[Dict[str, Any]] = [] llm_review_used = False llm_review_changed = False if llm_parse_enabled() and _llm_review_needed( mode=llm_review_mode, llm_enriched_used=bool(llm_enriched), name=name, roles=roles, skills=skills, exp_conf=exp_conf, english=english, location=location, ): for _ in range(_llm_review_rounds()): review_draft = _build_llm_review_draft( roles=roles, skills=skills, primary_languages=primary_languages, seniority=seniority, backend_focus=backend_focus_flag, exp_years=exp_years, exp_years_eng=exp_years_eng, english=english, location=location, remote=remote, sal_min=sal_min, sal_max=sal_max, highlights=highlights, keywords=keywords, ) review_res, review_dbg = llm_review_profile( clean, draft=review_draft, con=con, doc_type=dt.doc_type, sections=sections, ) llm_review_rounds_dbg.append(review_dbg) if not review_res: continue llm_review_used = True merged, merge_dbg = _merge_review_result( review=review_res, review_dbg=review_dbg, roles=roles, skills=skills, primary_languages=primary_languages, seniority=seniority, backend_focus=backend_focus_flag, remote=remote, location=location, english=english, exp_years=exp_years, exp_years_eng=exp_years_eng, exp_conf=exp_conf, sal_min=sal_min, sal_max=sal_max, sal_conf=sal_conf, highlights=highlights, keywords=keywords, llm_summary=llm_summary, llm_tags=llm_tags, ) llm_review_merge_dbg.append(merge_dbg) roles = merged["roles"] skills = merged["skills"] primary_languages = merged["primary_languages"] seniority = merged["seniority"] backend_focus_flag = merged["backend_focus"] remote = merged["remote"] location = merged["location"] english = merged["english"] exp_years = merged["exp_years"] exp_years_eng = merged["exp_years_eng"] exp_conf = merged["exp_conf"] sal_min = merged["sal_min"] sal_max = merged["sal_max"] sal_conf = merged["sal_conf"] highlights = merged["highlights"] keywords = merged["keywords"] llm_summary = merged["llm_summary"] llm_tags = merged["llm_tags"] if merge_dbg.get("changed_fields"): llm_review_changed = True else: break if llm_review_used: stats["llm_reviewed"] += 1 if llm_review_changed: stats["llm_review_changed"] += 1 if "+llm_review" not in parse_method: parse_method = f"{parse_method}+llm_review" llm_review_meta = { "enabled": llm_parse_enabled(), "mode": llm_review_mode, "used": llm_review_used, "changed": llm_review_changed, "rounds": llm_review_rounds_dbg, "merge": llm_review_merge_dbg, } roles = normalize_roles(roles) roles = _prune_roles_by_evidence(roles, clean) skills = normalize_skills(skills) skills_primary, skills_secondary = split_skills_primary_secondary( skills, clean_text=clean, sections=sections, ) location = normalize_location(location) exp_years, exp_years_eng, exp_conf, exp_dbg = _reconcile_experience_fields( exp_years=exp_years, exp_years_eng=exp_years_eng, exp_conf=exp_conf, exp_dbg=exp_dbg, positions=position_dicts, ) if not primary_languages: language_from_skills = [] for sk in skills: tok = _norm_lang_token(sk) if tok: language_from_skills.append(tok) primary_languages = _merge_lists(language_from_skills, primary_languages, limit=8) skills, primary_languages = _drop_false_java(skills, primary_languages, clean) simh = simhash64(to_fts_text(clean)) candidate_id = _candidate_by_contact(con, contacts) or stable_candidate_id(contacts, name, simh) _ensure_candidate(con, candidate_id, { "name": name, "location": location, "remote": (1 if remote else 0) if remote is not None else None, "experience_years": exp_years, "experience_years_eng": exp_years_eng, # Passed to DB "experience_confidence": exp_conf if exp_years is not None else None, "salary_min": sal_min, "salary_max": sal_max, "salary_confidence": sal_conf if sal_min is not None else None, "english_level": english, "roles": roles, "skills": skills, "primary_languages": primary_languages, "backend_focus": backend_focus_flag, "roles_norm": norm_pipe(roles), "skills_norm": norm_pipe(skills), }) _upsert_contacts(con, candidate_id, contacts) _upsert_candidate_skills(con, candidate_id, skills_primary, skills_secondary, parse_method) _upsert_candidate_roles(con, candidate_id, roles, parse_method) _upsert_candidate_languages(con, candidate_id, english, parse_method) near = _near_duplicate_active_resume(con, simh, max_dist=max_near_dist) near_dup_of = near[0] if near else None if near_dup_of: stats["near_dup"] += 1 if len(near_dup_examples) < 10: near_dup_examples.append({ "new_file": file_path, "dup_of": near_dup_of, "dist": near[1], "candidate_id": candidate_id, }) extraction = { "name_guess": name, "contacts": contacts, "doc_type": { "type": dt.doc_type, "confidence": dt.confidence, "signals": dt.signals, }, "extract": { "method": extract_method, "quality_score": extract_score, "quality_flags": extract_flags, "pages": pages[:40], }, "sections_present": sections_list, "parse": { "method": parse_method, "version": _PARSE_VERSION, }, "desired_title": desired_title, "skills_primary": skills_primary, "skills_secondary": skills_secondary, "hh_meta": { "specializations": parsed.get("specializations"), "employment_type": parsed.get("employment_type"), "schedule": parsed.get("schedule"), }, "positions": positions_to_dicts(positions), "positions_count": len(position_dicts), "experience": { "years": exp_years, "years_engineering": exp_years_eng, # Saved in JSON too "confidence": exp_conf, "debug": exp_dbg }, "salary": {"min": sal_min, "max": sal_max, "confidence": sal_conf, "debug": sal_dbg}, "location_guess": location, "roles": roles, "skills": skills, "primary_languages": primary_languages, "remote_guess": remote, "english": english, "llm_summary": llm_summary, "llm_tags": llm_tags, "seniority": seniority, "backend_focus": backend_focus_flag, "highlights": highlights, "keywords": keywords, "llm": { "used": bool(llm_enriched), "debug": llm_dbg, "data": asdict(llm_enriched) if llm_enriched else None, "review": llm_review_meta, }, } resume_id = _insert_resume( con=con, candidate_id=candidate_id, sha=sha, simh=simh, clean_text=clean, raw_text=raw_text, extraction_json=json.dumps(extraction, ensure_ascii=False), llm_summary=llm_summary, llm_tags=llm_tags, extract_method=extract_method, extract_quality_score=extract_score, extract_quality_flags=json.dumps(extract_flags, ensure_ascii=False), extract_pages_json=json.dumps(pages[:40], ensure_ascii=False), doc_type=dt.doc_type, doc_type_confidence=dt.confidence, parse_method=parse_method, parse_version=_PARSE_VERSION, sections_json=json.dumps(sections, ensure_ascii=False), file_path=file_path, mtime=mtime, size=size, near_dup_of=near_dup_of, ) _insert_source(con, resume_id, a) _insert_positions(con, resume_id, candidate_id, position_dicts) if sha and size is not None and mtime is not None: _update_files_seen(con, sha, size, mtime, resume_id) stats["processed_new"] += 1 if i % commit_every == 0: con.commit() log.info( f"[import] progress {i}/{len(artifacts)} " f"new={stats['processed_new']} dup_sha={stats['dup_sha']} " f"near={stats['near_dup']} err={stats['errors']}", {}, ) except Exception as e: stats["errors"] += 1 log.error("[import] artifact failed", {"err": repr(e), "artifact": a}) con.commit() stats["near_dup_examples"] = near_dup_examples log.info("[import] done", stats) return stats