from __future__ import annotations import json import re import sqlite3 from dataclasses import dataclass, asdict from typing import Any, Dict, List, Optional, Set, Tuple try: import httpx # type: ignore except Exception: # pragma: no cover httpx = None # type: ignore from tg_resume_db.search import search_with_filters from tg_resume_db.extract.parse import ( extract_remote, extract_english, extract_location_best_effort, extract_roles_skills, extract_salary, ) from tg_resume_db.extract.clean import normalize_text from tg_resume_db.extract.llm import resolve_llm_runtime from tg_resume_db.normalize import normalize_skill, find_skills_in_text # --------- Search plan (LLM outputs THIS, not SQL) ---------- @dataclass class SearchPlan: query_text: str = "" # full-text query (FTS) skills_any: List[str] = None # at least one must match skills_all: List[str] = None # all must match roles_any: List[str] = None location: Optional[str] = None remote: Optional[bool] = None english_min: Optional[str] = None # e.g. A1..C2 exp_years_min: Optional[float] = None salary_min: Optional[int] = None salary_max: Optional[int] = None limit: int = 20 sort: str = "rank" # rank | exp_desc | salary_desc def __post_init__(self): self.skills_any = self.skills_any or [] self.skills_all = self.skills_all or [] self.roles_any = self.roles_any or [] _ALLOWED_PLAN_KEYS = { "query_text", "skills_any", "skills_all", "roles_any", "location", "remote", "english_min", "exp_years_min", "salary_min", "salary_max", "limit", "sort", } # --------- Text helpers ---------- _EN_ORDER = {"A1": 1, "A2": 2, "B1": 3, "B2": 4, "C1": 5, "C2": 6} def _norm_token(s: str) -> str: s = (s or "").strip().lower() s = re.sub(r"\s+", " ", s) return s def _uniq_keep_order(xs: List[str]) -> List[str]: seen = set() out: List[str] = [] for x in (xs or []): x = _norm_token(str(x)) if not x or x in seen: continue seen.add(x) out.append(x) return out def _filter_skills_vs_location(skills: List[str], location: Optional[str]) -> List[str]: if not skills: return [] bad = set() if location: bad.add(_norm_token(location)) for w in [ "москва", "санкт-петербург", "спб", "питер", "екатеринбург", "минск", "алматы", "remote", "удаленно", "удалённо", "удаленка", "удалёнка", "гибрид", "hybrid", "офис", "office", "onsite", "on-site", ]: bad.add(w) return [s for s in skills if _norm_token(s) not in bad] # ---- Name-list detection (чтобы не ужимать фильтрами запрос "списком ФИО") ---- _NAME_RE = re.compile(r"\b[А-ЯЁA-Z][а-яёa-z]+(?:[-\s]+[А-ЯЁA-Z][а-яёa-z]+)+\b") def _looks_like_name_list(user_prompt: str) -> bool: """ Heuristic: если в запросе несколько строк с ФИО, считаем это прямой поиск по именам и не жёстко фильтруем по стеку/опыту. """ if not user_prompt: return False matches = _NAME_RE.findall(user_prompt) if len(matches) >= 3: return True # lines with at least one full name lines = [ln.strip() for ln in user_prompt.splitlines() if ln.strip()] name_lines = sum(1 for ln in lines if _NAME_RE.search(ln)) return name_lines >= 2 and len(matches) >= 2 # ---- Work mode: hybrid must NOT force remote=true ---- _HYBRID_RE = re.compile(r"\b(гибрид|hybrid)\b", re.I) _REMOTE_RE = re.compile(r"\b(remote|удал(ен|ён|енно|ённо)?|удаленк|удалёнк|дистанц)\b", re.I) _OFFICE_RE = re.compile(r"\b(офис|office|on[-\s]?site|onsite|в офисе|на месте)\b", re.I) def _apply_work_mode_overrides(user_prompt: str, plan: SearchPlan) -> None: """ Принудительно правим plan.remote по тексту запроса: - "гибрид" => remote = None (не фильтруем) - "офис/onsite" => remote = False - "remote/удаленно" => remote = True """ t = (user_prompt or "").lower() if _HYBRID_RE.search(t): plan.remote = None return if _OFFICE_RE.search(t): plan.remote = False return if _REMOTE_RE.search(t): plan.remote = True return def _simplify_query_text(user_prompt: str, skills_any: List[str]) -> str: """ FTS-поиск может ухудшаться, если query_text перегружен. Если в запросе явно стек (3+ технологий) — оставим краткий search intent. """ up = (user_prompt or "").strip() if len(skills_any) >= 3: # максимально безопасно и универсально if re.search(r"\bbackend\b", up, re.I) or "бэкенд" in up.lower(): return "backend developer" return "developer" return up # --------- sanitize helpers ---------- def _as_list(v: Any) -> List[str]: if v is None: return [] if isinstance(v, list): return [str(x) for x in v if str(x).strip()] s = str(v).strip() if not s: return [] return [x.strip() for x in s.split(",") if x.strip()] def _to_bool(v: Any) -> Optional[bool]: if v is None: return None if isinstance(v, bool): return v s = str(v).strip().lower() if s in ("true", "1", "yes", "y", "да", "д"): return True if s in ("false", "0", "no", "n", "нет", "н"): return False return None def _to_int(v: Any) -> Optional[int]: if v is None: return None try: return int(float(v)) except Exception: return None def _to_float(v: Any) -> Optional[float]: if v is None: return None try: return float(v) except Exception: return None def _sanitize_plan_dict(obj: Any) -> Dict[str, Any]: """ Убираем лишние ключи (например, user_prompt) и приводим типы. Лечит: SearchPlan.__init__() got an unexpected keyword argument ... """ if not isinstance(obj, dict): return {} clean: Dict[str, Any] = {} for k, v in obj.items(): if k not in _ALLOWED_PLAN_KEYS: continue clean[k] = v if "skills_any" in clean: clean["skills_any"] = _as_list(clean["skills_any"]) if "skills_all" in clean: clean["skills_all"] = _as_list(clean["skills_all"]) if "roles_any" in clean: clean["roles_any"] = _as_list(clean["roles_any"]) if "remote" in clean: clean["remote"] = _to_bool(clean["remote"]) if "salary_min" in clean: clean["salary_min"] = _to_int(clean["salary_min"]) if "salary_max" in clean: clean["salary_max"] = _to_int(clean["salary_max"]) if "exp_years_min" in clean: clean["exp_years_min"] = _to_float(clean["exp_years_min"]) if "limit" in clean: lim = _to_int(clean["limit"]) clean["limit"] = lim if lim is not None else 20 if "sort" in clean: clean["sort"] = str(clean["sort"] or "").strip() if "location" in clean and clean["location"] is not None: loc = str(clean["location"]).strip() clean["location"] = loc if loc else None if "english_min" in clean and clean["english_min"] is not None: eng = str(clean["english_min"]).strip().upper() clean["english_min"] = eng if eng else None if "query_text" in clean and clean["query_text"] is not None: clean["query_text"] = str(clean["query_text"]).strip() return clean # --------- heuristic plan ---------- def _heuristic_plan(user_prompt: str) -> SearchPlan: # Если запрос похож на список имён — ищем по тексту без лишних фильтров if _looks_like_name_list(user_prompt): return SearchPlan( query_text=user_prompt.strip(), skills_any=[], skills_all=[], roles_any=[], location=None, remote=None, english_min=None, exp_years_min=None, salary_min=None, salary_max=None, limit=20, sort="rank", ) text = normalize_text(user_prompt) roles, skills = extract_roles_skills(text) location = extract_location_best_effort(text) remote = extract_remote(text) english = extract_english(text) sal_min, sal_max, sal_conf, _ = extract_salary(text) skills = _filter_skills_vs_location(skills, location) roles = _uniq_keep_order(roles) skills = _uniq_keep_order(skills) plan = SearchPlan( query_text=_simplify_query_text(user_prompt, skills), skills_any=skills[:12], roles_any=(["backend"] if ("backend" in roles or "backend" in user_prompt.lower()) else roles[:6]), location=location, remote=remote, english_min=english, salary_min=sal_min if sal_conf and sal_conf >= 0.4 else None, salary_max=sal_max if sal_conf and sal_conf >= 0.4 else None, limit=20, sort="rank", ) _apply_work_mode_overrides(user_prompt, plan) return plan # --------- Optional LLM (OpenAI-compatible base_url) ---------- def _llm_enabled() -> bool: if httpx is None: return False runtime = resolve_llm_runtime() return bool(runtime.get("base_url")) and bool(runtime.get("model")) def _llm_call_json(messages: List[Dict[str, str]]) -> Dict[str, Any]: if httpx is None: raise RuntimeError("httpx is not installed") runtime = resolve_llm_runtime() base_url = runtime.get("base_url", "").rstrip("/") model = runtime.get("model", "") api_key = runtime.get("api_key", "") if not base_url or not model: raise RuntimeError("LLM runtime is not configured") payload = {"model": model, "messages": messages, "temperature": 0.2} headers = {"Content-Type": "application/json"} if api_key: headers["Authorization"] = f"Bearer {api_key}" with httpx.Client(timeout=30.0) as client: r = client.post(f"{base_url}/chat/completions", headers=headers, json=payload) r.raise_for_status() data = r.json() content = data["choices"][0]["message"]["content"] m = re.search(r"\{.*\}", content, flags=re.S) if not m: raise ValueError("LLM did not return JSON") return json.loads(m.group(0)) def _llm_build_plan(user_prompt: str, draft: SearchPlan) -> SearchPlan: schema_hint = { "query_text": "string", "skills_any": ["string"], "skills_all": ["string"], "roles_any": ["string"], "location": "string|null", "remote": "bool|null", "english_min": "A1|A2|B1|B2|C1|C2|null", "exp_years_min": "number|null", "salary_min": "int|null", "salary_max": "int|null", "limit": "int", "sort": "rank|exp_desc|salary_desc", } msgs = [ { "role": "system", "content": ( "Ты превращаешь запрос рекрутера в JSON-фильтры поиска по базе резюме.\n" "НЕЛЬЗЯ писать SQL. Верни ТОЛЬКО JSON объекта SearchPlan.\n" f"Schema: {json.dumps(schema_hint, ensure_ascii=False)}\n" "ВАЖНО:\n" "- Никаких лишних ключей - только поля Schema.\n" "- Не добавляй в skills города/локации.\n" "- 'гибрид' НЕ означает remote=true (если видишь 'гибрид' - remote=null).\n" "- Старайся делать поиск широким: skills_all используй ТОЛЬКО если явно попросили обязательные навыки.\n" "- Если в запросе есть указание уровня английского (например B2+), заполни english_min.\n" "- Если явно указан опыт 'N+' лет - поставь exp_years_min=N.\n" ), }, { "role": "user", "content": ( f"Запрос: {user_prompt}\n\n" f"Черновик (эвристика): {json.dumps(asdict(draft), ensure_ascii=False)}" ), }, ] obj_raw = _llm_call_json(msgs) obj = _sanitize_plan_dict(obj_raw) plan = SearchPlan(**{**asdict(draft), **obj}) plan.skills_any = _uniq_keep_order(_filter_skills_vs_location(plan.skills_any, plan.location)) plan.skills_all = _uniq_keep_order(_filter_skills_vs_location(plan.skills_all, plan.location)) plan.roles_any = _uniq_keep_order(plan.roles_any) # мягко улучшим query_text plan.query_text = _simplify_query_text(user_prompt, plan.skills_any) plan.limit = max(5, min(int(plan.limit or 20), 50)) if plan.sort not in ("rank", "exp_desc", "salary_desc"): plan.sort = "rank" # fallback: если LLM обнулил важные поля - вернём эвристику if not plan.skills_any: plan.skills_any = draft.skills_any if not plan.skills_all: plan.skills_all = draft.skills_all if plan.english_min is None and draft.english_min is not None: plan.english_min = draft.english_min if plan.exp_years_min is None: try: req_exp = _extract_required_exp_years(user_prompt) if req_exp is not None: plan.exp_years_min = req_exp except Exception: pass _apply_work_mode_overrides(user_prompt, plan) return plan # --------- post processing: dedupe + "real fit" filter ---------- _CORE = {"java", "kotlin", "python", "go", "golang"} _BONUS = {"c++", "cpp"} _LANG_VARIANTS = { "java": {"java"}, "kotlin": {"kotlin"}, "python": {"python"}, "go": {"go", "golang"}, "c++": {"c++", "cpp", "c plus plus"}, "c#": {"c#", "csharp"}, } _SKILL_EVIDENCE_ALIASES = { "go": {"go", "golang"}, "golang": {"go", "golang"}, "kubernetes": {"kubernetes", "k8s"}, "postgresql": {"postgresql", "postgres", "postgre sql", "postgre-sql", "psql"}, "javascript": {"javascript", "java script", "js"}, "typescript": {"typescript", "type script", "ts"}, "nodejs": {"nodejs", "node js", "node.js", "node"}, "grpc": {"grpc", "g rpc"}, "graphql": {"graphql", "graph ql"}, "ci/cd": {"ci/cd", "ci cd", "cicd"}, "c++": {"c++", "cpp", "c plus plus"}, "c#": {"c#", "csharp", "c sharp"}, "dotnet": {"dotnet", ".net"}, "aws": {"aws", "amazon web services"}, "gcp": {"gcp", "google cloud", "google cloud platform"}, "redis": {"redis"}, "kafka": {"kafka"}, "docker": {"docker"}, } _GENERIC_SKIP_SKILLS = { "backend", "frontend", "fullstack", "developer", "engineer", "senior", "middle", "junior", "lead", } _DOMAIN_VARIANTS = { "fintech": { "fintech", "финтех", "bank", "banking", "бан", "payment", "payments", "card", "cards", "sber", "тбанк", "tinkoff", "visa", "mastercard", "trading", "exchange", "crypto", "крипт", "биржа", }, "ecommerce": { "ecommerce", "e-commerce", "marketplace", "retail", "checkout", "cart", "онлайн магазин", }, "gamedev": {"gamedev", "game dev", "gaming", "unity", "unreal", "игр"}, "healthcare": {"healthcare", "medtech", "hospital", "clinic", "мед", "health tech"}, } def _token_in_text(text: str, token: str) -> bool: if not text or not token: return False pat = r"(? bool: aliases = _LANG_VARIANTS.get(canon_lang, {canon_lang}) for tok in aliases: if _token_in_text(text, tok): return True return False def _skill_aliases(skill: str) -> List[str]: canon = normalize_skill(skill) or _norm_token(skill) if not canon: return [] aliases = set() aliases.add(canon) aliases.add(_norm_token(skill)) aliases.update(_SKILL_EVIDENCE_ALIASES.get(canon, set())) if canon in _LANG_VARIANTS: aliases.update(_LANG_VARIANTS.get(canon, set())) out: List[str] = [] for a in aliases: t = _norm_token(a) if not t: continue out.append(t) return _uniq_keep_order(out) def _extract_required_skills(user_prompt: str, plan: Optional[SearchPlan], req_langs: Set[str]) -> List[str]: raw: List[str] = [] if plan: raw.extend(plan.skills_all or []) raw.extend(plan.skills_any or []) raw.extend(find_skills_in_text(user_prompt or "")) raw.extend(list(req_langs or set())) out: List[str] = [] seen = set() for s in raw: canon = normalize_skill(s) or _norm_token(s) if not canon: continue canon = _norm_token(canon) if canon in _GENERIC_SKIP_SKILLS: continue if canon in seen: continue seen.add(canon) out.append(canon) return out[:10] def _query_stack_is_strict(user_prompt: str) -> bool: t = (user_prompt or "").lower() if any(w in t for w in ("обязательно", "строго", "must", "required", "mandatory", "без этого")): return True if "," in t and " или " not in t and " or " not in t: return True return False def _extract_required_domains(user_prompt: str) -> List[str]: t = (user_prompt or "").lower() out: List[str] = [] for canon, variants in _DOMAIN_VARIANTS.items(): if any(v in t for v in variants): out.append(canon) return out def _domain_hit(text: str, domain: str) -> bool: variants = _DOMAIN_VARIANTS.get(domain, set()) txt = (text or "").lower() return any(v in txt for v in variants) def _load_resume_contexts( con: sqlite3.Connection, items: List[Dict[str, Any]], ) -> Dict[str, Dict[str, str]]: resume_ids = [] seen = set() for it in items or []: rid = str(it.get("resume_id") or "").strip() if not rid or rid in seen: continue seen.add(rid) resume_ids.append(rid) if not resume_ids: return {} ph = ",".join("?" for _ in resume_ids) sql = ( f"SELECT resume_id, clean_text, sections_json, extraction_json " f"FROM resumes WHERE resume_id IN ({ph})" ) try: rows = con.execute(sql, resume_ids).fetchall() except Exception: return {} out: Dict[str, Dict[str, str]] = {} for r in rows: rid = str(r["resume_id"]) clean = str(r["clean_text"] or "") sections: Dict[str, Any] = {} try: raw = json.loads(r["sections_json"] or "{}") if isinstance(raw, dict): sections = raw except Exception: sections = {} extraction: Dict[str, Any] = {} try: raw = json.loads(r["extraction_json"] or "{}") if isinstance(raw, dict): extraction = raw except Exception: extraction = {} skills_text = str(sections.get("skills") or "") body_parts: List[str] = [] for key in ("about", "summary", "experience", "projects", "work"): val = sections.get(key) if val: body_parts.append(str(val)) for p in extraction.get("positions") or []: if not isinstance(p, dict): continue body_parts.append(str(p.get("title") or "")) body_parts.append(str(p.get("company") or "")) body_parts.append(str(p.get("description") or "")) body_text = "\n".join(body_parts).strip() # fallback for badly split templates if len(body_text) < 80: body_text = clean if skills_text: body_text = body_text.replace(skills_text, " ") out[rid] = { "skills_text": skills_text.lower(), "body_text": body_text.lower(), "clean_text": clean.lower(), } return out def _normalize_lang_token(token: str) -> Optional[str]: t = _norm_token(token) if not t: return None for canon, aliases in _LANG_VARIANTS.items(): if t == canon or t in aliases: return canon return None def _extract_required_languages(user_prompt: str) -> List[str]: t = (user_prompt or "").lower() hits: List[str] = [] for canon, aliases in _LANG_VARIANTS.items(): if any(_token_in_text(t, alias) for alias in aliases): if canon not in hits: hits.append(canon) return hits def _dedupe_by_candidate_best_rank(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: best: Dict[str, Dict[str, Any]] = {} for it in items or []: cid = it.get("candidate_id") or "" if not cid: continue if cid not in best: best[cid] = it continue # rank: у тебя чем меньше (более отрицательный), тем "выше" r_new = it.get("rank") r_old = best[cid].get("rank") try: if r_old is None or (r_new is not None and float(r_new) < float(r_old)): best[cid] = it except Exception: pass return list(best.values()) def _needs_postfilter(user_prompt: str) -> bool: """ Включаем строгий "вакансионный" фильтр, если запрос похож на вакансию: - "опыт от N лет" или "5+" - явный стек из языков """ if _looks_like_name_list(user_prompt): return False t = (user_prompt or "").lower() if re.search(r"(опыт|experience).{0,20}(\d+)\s*\+|\b(\d+)\s*\+\s*лет", t): return True skill_hits = len(find_skills_in_text(t)) if skill_hits >= 2: return True if _extract_required_domains(user_prompt) and skill_hits >= 1: return True # stack words fallback hits = 0 for w in ("java", "kotlin", "python", "go", "golang", "c++", "cpp"): if w in t: hits += 1 return hits >= 2 _EXCLUDE_LOC_MARKERS = { "россия", "russia", "rf", "russian federation", "moscow", "москва", "москв", "spb", "petersburg", "петербург", "санкт", "мск", "belarus", "беларусь", "белоруссия", "iran", "ирак", "iraq", "пакистан", "pakistan", "india", "индия", "африк", } def _location_exclusion_requested(user_prompt: str) -> bool: t = (user_prompt or "").lower() return any(k in t for k in _EXCLUDE_LOC_MARKERS) and ("кроме" in t or "except" in t or "не " in t) def _extract_required_exp_years(user_prompt: str) -> Optional[float]: t = (user_prompt or "").lower() m = re.search(r"(опыт|experience).{0,20}(\d+(?:[.,]\d+)?)\s*(?:лет|years?)", t) if m: try: return float(m.group(2).replace(",", ".")) except Exception: return None m = re.search(r"\b(\d+(?:[.,]\d+)?)\s*\+\s*(?:лет|years?)\b", t) if m: try: return float(m.group(1).replace(",", ".")) except Exception: return None return None def _extract_required_english(user_prompt: str) -> Optional[str]: t = (user_prompt or "").upper() m = re.search(r"\b(A1\+?|A2\+?|B1\+?|B2\+?|C1\+?|C2\+?)\b", t) if m: return m.group(1).replace("+", "") # textual if "FLUENT" in t or "ADVANCED" in t or "PROFICIENT" in t: return "C1" if "UPPER" in t and "INTERMEDIATE" in t: return "B2" if "INTERMEDIATE" in t: return "B1" return None def _jobfit_filter_items( con: sqlite3.Connection, user_prompt: str, items: List[Dict[str, Any]], plan: Optional[SearchPlan] = None, ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: """ "По-взрослому": - exp >= required (если указано) - совпадает стек (минимум 1 язык из запроса/Core) - обязательный основной Go (для этого запроса с Go) - английский >= требуемого уровня - backend не обязателен, но учитывается в сортировке """ req_exp = _extract_required_exp_years(user_prompt) # например 5.0 req_langs = set(_extract_required_languages(user_prompt)) req_english = _extract_required_english(user_prompt) req_skills = _extract_required_skills(user_prompt, plan, req_langs) req_domains = _extract_required_domains(user_prompt) strict_stack = _query_stack_is_strict(user_prompt) or (req_exp is not None) must_have_skills = _uniq_keep_order([normalize_skill(s) or s for s in ((plan.skills_all or []) if plan else [])]) if not must_have_skills and strict_stack and req_skills: # Для коротких стеков в вакансии считаем все элементы обязательными. if len(req_skills) <= 4: must_have_skills = req_skills else: must_have_skills = req_skills[:4] filtered: List[Dict[str, Any]] = [] dropped = 0 reasons: Dict[str, int] = {} exclude_ru = _location_exclusion_requested(user_prompt) # Если явно ищут Go и нет числа лет — зададим минимум 4 года как дефолт if req_exp is None and ("go" in req_langs or "golang" in req_langs): req_exp = 4.0 resume_ctx = _load_resume_contexts(con, items) for it in items or []: roles = set((it.get("roles") or [])) skills = set(_norm_token(s) for s in (it.get("skills") or [])) for pl in it.get("primary_languages") or []: skills.add(_norm_token(pl)) # CHECK ENGINEERING EXPERIENCE FIRST # If 'experience_years_eng' is available and distinct (not None), use it. # Otherwise fallback to 'experience_years'. exp_eng = it.get("experience_years_eng") exp_total = it.get("experience_years") # Prefer engineering years for filtering if available exp_val = None if exp_eng is not None: try: exp_val = float(exp_eng) except: pass if exp_val is None and exp_total is not None: try: exp_val = float(exp_total) except: pass if req_exp is not None and (exp_val is None or exp_val < req_exp): dropped += 1 reasons["exp_lt_required"] = reasons.get("exp_lt_required", 0) + 1 continue backend_focus_flag = it.get("backend_focus") loc = (it.get("location") or "").lower() if exclude_ru and any(bad in loc for bad in _EXCLUDE_LOC_MARKERS): dropped += 1 reasons["location_excluded"] = reasons.get("location_excluded", 0) + 1 continue lang_tokens: Set[str] = set() for lang in (it.get("primary_languages") or []): norm = _normalize_lang_token(lang) if norm: lang_tokens.add(norm) if not lang_tokens: for sk in skills: norm = _normalize_lang_token(sk) if norm: lang_tokens.add(norm) # Для language-стека оставляем базовую проверку. missing_primary_lang = False for req_lang in req_langs: if req_lang not in lang_tokens and req_lang in ("go", "python", "java", "kotlin", "c++", "c#"): missing_primary_lang = True break if missing_primary_lang: dropped += 1 reasons["no_primary_required_lang"] = reasons.get("no_primary_required_lang", 0) + 1 continue rid = str(it.get("resume_id") or "") ctx = resume_ctx.get(rid) or {} ctx_body = str(ctx.get("body_text") or "") ctx_skills = str(ctx.get("skills_text") or "") ctx_clean = str(ctx.get("clean_text") or "") ctx_domain = "\n".join([ctx_body, ctx_clean, str(it.get("snippet") or "").lower()]) # Evidence-based skill validation (не только Go): # must-have скиллы не должны быть только в section "skills". skill_hits_total = 0 skill_hits_body = 0 missing_must = 0 skills_only_must = 0 skills_only_critical = 0 for req_skill in req_skills: aliases = _skill_aliases(req_skill) if not aliases: continue hit_body = any(_token_in_text(ctx_body, a) for a in aliases) hit_skills = any(_token_in_text(ctx_skills, a) for a in aliases) hit_any = hit_body or hit_skills or any(_norm_token(req_skill) == _norm_token(s) for s in skills) if hit_any: skill_hits_total += 1 if hit_body: skill_hits_body += 1 if req_skill in must_have_skills: if not hit_any: missing_must += 1 elif not hit_body and hit_skills: skills_only_must += 1 if _normalize_lang_token(req_skill) is not None: skills_only_critical += 1 if missing_must > 0: dropped += 1 reasons["required_skill_missing"] = reasons.get("required_skill_missing", 0) + 1 continue # Строго режем, если ключевой language-требование есть только в skill-list, # либо если весь must-have стек не подтвержден опытом. if strict_stack and (skills_only_critical > 0 or (must_have_skills and skills_only_must >= len(must_have_skills))): dropped += 1 reasons["required_skill_only_in_skills"] = reasons.get("required_skill_only_in_skills", 0) + 1 continue if req_skills and strict_stack: min_hits = len(must_have_skills) if must_have_skills else (2 if len(req_skills) >= 2 else 1) if skill_hits_total < min_hits: dropped += 1 reasons["required_skills_weak"] = reasons.get("required_skills_weak", 0) + 1 continue domain_hits = 0 for d in req_domains: if _domain_hit(ctx_domain, d): domain_hits += 1 if req_domains and strict_stack and domain_hits < len(req_domains): dropped += 1 reasons["domain_mismatch"] = reasons.get("domain_mismatch", 0) + 1 continue if req_langs: lang_hits_req = len(lang_tokens & req_langs) if lang_hits_req < 1: dropped += 1 reasons["lang_stack_weak"] = reasons.get("lang_stack_weak", 0) + 1 continue else: lang_hits_req = None core_hits = len(lang_tokens & _CORE) bonus_hits = len(lang_tokens & _BONUS) # Требуем хотя бы один язык из CORE/bonus if core_hits + bonus_hits < 1: dropped += 1 reasons["stack_too_weak"] = reasons.get("stack_too_weak", 0) + 1 continue it2 = dict(it) it2["_fit"] = { "core_hits": core_hits, "bonus_cpp": bool(bonus_hits), "req_lang_hits": lang_hits_req, "req_skill_hits": skill_hits_total, "req_skill_hits_body": skill_hits_body, "req_domain_hits": domain_hits, "backend_role": "backend" in roles, "backend_focus": backend_focus_flag, } if req_english: lvl = str(it.get("english_level") or "").upper() if not lvl or _EN_ORDER.get(lvl, 0) < _EN_ORDER.get(req_english, 0): dropped += 1 reasons["english_below_required"] = reasons.get("english_below_required", 0) + 1 continue filtered.append(it2) # сорт: больше core_hits, затем rank def key(x: Dict[str, Any]): fit = x.get("_fit") or {} core_hits = int(fit.get("core_hits", 0)) bonus = 1 if fit.get("bonus_cpp") else 0 backend_bonus = 1 if fit.get("backend_role") or fit.get("backend_focus") else 0 req_skill_hits = int(fit.get("req_skill_hits", 0)) req_skill_hits_body = int(fit.get("req_skill_hits_body", 0)) req_domain_hits = int(fit.get("req_domain_hits", 0)) r = x.get("rank") try: r = float(r) except Exception: r = 0.0 # ручной скоринг по доменным признакам score = 0.0 if "go" in (x.get("primary_languages") or []): score += 5.0 # основной Go try: if x.get("experience_years_eng") and float(x.get("experience_years_eng")) >= max(4.0, req_exp or 0): score += 3.0 except Exception: pass skills = set(_norm_token(s) for s in (x.get("skills") or [])) text_boost = 0.0 for kw in ("kubernetes", "k8s"): if kw in skills: text_boost += 1.5; break for kw in ("ddd", "domain-driven design", "eda", "event-driven"): if kw in skills: text_boost += 2.0; break for kw in ("fintech", "trading", "crypto", "exchange", "биржа", "финтех"): if kw in skills: text_boost += 2.5; break snippet = (x.get("snippet") or "").lower() for kw in ("highload", "high-load", "high throughput", "high-throughput", "low latency", "low-latency", "highload"): if kw in snippet: text_boost += 1.5 break score += text_boost return (-req_domain_hits, -req_skill_hits_body, -req_skill_hits, -core_hits, -backend_bonus, -bonus, -(score), r) filtered.sort(key=key) dbg = { "postfilter_applied": True, "required_exp": req_exp, "required_languages": sorted(list(req_langs)), "required_skills": req_skills, "must_have_skills": must_have_skills, "required_domains": req_domains, "strict_stack": strict_stack, "dropped": dropped, "reasons": reasons, } return filtered, dbg # --------- Refinement loop ---------- def _refine_plan_no_llm(plan: SearchPlan, result_count: int, user_prompt: str) -> SearchPlan: p = SearchPlan(**asdict(plan)) if result_count == 0: p.location = None p.salary_min = None p.salary_max = None p.english_min = None # если было строго по remote — ослабим; потом override применим обратно p.remote = None # опыт уменьшаем плавно if p.exp_years_min is not None: p.exp_years_min = max(0.0, float(p.exp_years_min) - 1.0) if not (p.query_text or "").strip(): p.query_text = " ".join(p.skills_any[:8]) _apply_work_mode_overrides(user_prompt, p) return p return p def agent_search( con: sqlite3.Connection, user_prompt: str, max_iters: int = 2, limit: int = 20, ) -> Dict[str, Any]: draft = _heuristic_plan(user_prompt) draft.limit = limit names_only_query = _looks_like_name_list(user_prompt) plan = _llm_build_plan(user_prompt, draft) if (_llm_enabled() and not names_only_query) else draft plan.limit = limit history: List[Dict[str, Any]] = [] final_items: List[Dict[str, Any]] = [] final_count = 0 for i in range(max_iters + 1): _apply_work_mode_overrides(user_prompt, plan) res = search_with_filters(con, plan) items = res.get("items", []) count = int(res.get("count", len(items))) history.append( { "plan": asdict(plan), "count": count, "top_snippets": [it.get("snippet", "")[:180] for it in items[:5]], } ) if count > 0 or i == max_iters: final_items = items final_count = count break # refine if _llm_enabled(): msgs = [ { "role": "system", "content": ( "Ты корректируешь JSON SearchPlan. Верни ТОЛЬКО JSON с полями SearchPlan.\n" "Если 0 результатов — ослабь фильтры: remote=null, exp_years_min уменьшить/обнулить, " "location/salary/english убрать. skills_any сохранить.\n" "Никаких лишних ключей. Помни: 'гибрид' НЕ означает remote=true.\n" ), }, { "role": "user", "content": json.dumps( { "query": user_prompt, "previous_plan": asdict(plan), "result_count": count, }, ensure_ascii=False, ), }, ] obj_raw = _llm_call_json(msgs) obj = _sanitize_plan_dict(obj_raw) plan = SearchPlan(**{**asdict(plan), **obj}) plan.skills_any = _uniq_keep_order(_filter_skills_vs_location(plan.skills_any, plan.location)) plan.skills_all = _uniq_keep_order(_filter_skills_vs_location(plan.skills_all, plan.location)) plan.roles_any = _uniq_keep_order(plan.roles_any) plan.query_text = _simplify_query_text(user_prompt, plan.skills_any) plan.limit = limit if plan.sort not in ("rank", "exp_desc", "salary_desc"): plan.sort = "rank" _apply_work_mode_overrides(user_prompt, plan) else: plan = _refine_plan_no_llm(plan, count, user_prompt) plan.limit = limit # ---- 1) dedupe ---- deduped = _dedupe_by_candidate_best_rank(final_items) # ---- 2) postfilter for vacancy-like queries ---- post_dbg: Dict[str, Any] = {"postfilter_applied": False} if _needs_postfilter(user_prompt): filtered, post_dbg = _jobfit_filter_items(con, user_prompt, deduped, plan=plan) else: filtered = deduped return { "plan": asdict(plan), "items": filtered, "count": len(filtered), "history": history, "llm_used": _llm_enabled(), "postfilter": post_dbg, }