from __future__ import annotations import argparse import json import sys from datetime import datetime from typing import Any, Dict from pathlib import Path import os from tg_resume_db.bundle_export import bundle_search_results from tg_resume_db.db import connect, init_db from tg_resume_db.pipeline import import_exports as run_import from tg_resume_db.search import search as run_search from tg_resume_db.util import Logger from tg_resume_db.extract.text_extract import extract_text as extract_text_generic from tg_resume_db.extract.pdf_extract import extract_pdf_best from tg_resume_db.extract.clean import normalize_text from tg_resume_db.extract.doc_type import detect_doc_type from tg_resume_db.extract.sections import split_sections, sections_present from tg_resume_db.extract.parse import extract_name_guess def _print_json(obj: Dict[str, Any]) -> None: s = json.dumps(obj, ensure_ascii=False, indent=2) try: print(s) except UnicodeEncodeError: # Fallback for cp1251/legacy consoles. print(s.encode("ascii", "backslashreplace").decode("ascii")) def _is_interactive() -> bool: return sys.stdin.isatty() and sys.stdout.isatty() def main() -> None: ap = argparse.ArgumentParser(prog="tg_resume_db") sub = ap.add_subparsers(dest="cmd", required=True) # ---------------- import_exports ---------------- imp = sub.add_parser("import_exports", help="Import Telegram exports recursively (incremental)") imp.add_argument("--input", required=True, help="Path to exports directory") imp.add_argument("--db", required=True, help="SQLite db path") imp.add_argument("--log", default="./import.log", help="Log file path") imp.add_argument("--near-dist", type=int, default=6, help="Simhash max Hamming distance for near-duplicates") imp.add_argument("--min-text-len", type=int, default=250, help="Skip very short texts") imp.add_argument( "--llm", choices=["auto", "off", "force"], default="auto", help="LLM enrichment mode: auto (default), off to disable, force to always run when configured", ) imp.add_argument( "--llm-review", choices=["always", "auto", "off"], default="always", help="LLM review mode for parsed JSON: always (default), auto, off", ) imp.add_argument( "--llm-review-rounds", type=int, default=1, help="How many LLM review merge rounds to run per resume (1..3)", ) # ---------------- search ---------------- s = sub.add_parser("search", help="Search candidates") s.add_argument("--db", required=True) s.add_argument("--query", required=True) s.add_argument("--limit", type=int, default=20) s.add_argument("--offset", type=int, default=0) s.add_argument("--remote", choices=["true", "false"], default=None) s.add_argument("--location", default=None) s.add_argument("--experience-min", type=float, default=None) s.add_argument("--salary-min", type=int, default=None) s.add_argument("--salary-max", type=int, default=None) s.add_argument("--english", default=None) s.add_argument("--doc-type", default=None) # AI mode s.add_argument("--ai", action="store_true", help="Use LLM to build filters from text query and run search") s.add_argument("--ai-iters", type=int, default=2, help="How many refine iterations for AI search") # Backward compatible single-value filters s.add_argument("--role", default=None, help="Single role (backward compatible); prefer --roles-any") s.add_argument("--skill", default=None, help="Single skill (backward compatible); prefer --skills-any/--skills-all") # Stack filters (comma-separated) s.add_argument("--roles-any", default=None, help="Comma-separated roles; at least one must match") s.add_argument("--skills-any", default=None, help="Comma-separated skills; at least one must match") s.add_argument("--skills-all", default=None, help="Comma-separated skills; all must match") # Bundle export behavior s.add_argument("--bundle", choices=["ask", "yes", "no"], default="ask", help="Bundle found resumes into a folder") # ---------------- inspect ---------------- ins = sub.add_parser("inspect", help="Inspect a single resume file (doc_type/sections)") ins.add_argument("--file", required=True, help="Path to resume file") args = ap.parse_args() # ========================= import_exports ========================= if args.cmd == "import_exports": con = connect(args.db) try: init_db(con) log = Logger(args.log) prev_enabled = os.environ.get("LLM_PARSE_ENABLED") prev_force = os.environ.get("LLM_PARSE_FORCE") prev_review_mode = os.environ.get("LLM_PARSE_REVIEW_MODE") prev_review_rounds = os.environ.get("LLM_PARSE_REVIEW_ROUNDS") try: if args.llm == "off": os.environ["LLM_PARSE_ENABLED"] = "0" os.environ["LLM_PARSE_REVIEW_MODE"] = "off" elif args.llm == "force": os.environ["LLM_PARSE_ENABLED"] = "1" os.environ["LLM_PARSE_FORCE"] = "1" os.environ["LLM_PARSE_REVIEW_MODE"] = "always" else: os.environ["LLM_PARSE_REVIEW_MODE"] = args.llm_review rounds = max(1, min(int(args.llm_review_rounds), 3)) os.environ["LLM_PARSE_REVIEW_ROUNDS"] = str(rounds) stats = run_import( con=con, input_dir=args.input, log=log, max_near_dist=args.near_dist, min_text_len=args.min_text_len, ) finally: if args.llm == "off": if prev_enabled is None: os.environ.pop("LLM_PARSE_ENABLED", None) else: os.environ["LLM_PARSE_ENABLED"] = prev_enabled elif args.llm == "force": if prev_enabled is None: os.environ.pop("LLM_PARSE_ENABLED", None) else: os.environ["LLM_PARSE_ENABLED"] = prev_enabled if prev_force is None: os.environ.pop("LLM_PARSE_FORCE", None) else: os.environ["LLM_PARSE_FORCE"] = prev_force if prev_review_mode is None: os.environ.pop("LLM_PARSE_REVIEW_MODE", None) else: os.environ["LLM_PARSE_REVIEW_MODE"] = prev_review_mode if prev_review_rounds is None: os.environ.pop("LLM_PARSE_REVIEW_ROUNDS", None) else: os.environ["LLM_PARSE_REVIEW_ROUNDS"] = prev_review_rounds finally: con.close() _print_json(stats) return # ============================= search ============================= if args.cmd == "search": con = connect(args.db) init_db(con) # важно: гарантирует, что resumes_fts и триггеры существуют try: items: list[Dict[str, Any]] = [] out: Dict[str, Any] = {} if args.ai: from tg_resume_db.agent import agent_search res = agent_search( con, user_prompt=args.query, max_iters=args.ai_iters, ) items = res.get("items", []) out = { "ai": True, "llm_used": res.get("llm_used", False), "plan": res.get("plan"), "history": res.get("history"), "postfilter": res.get("postfilter"), "items": items, "count": res.get("count", len(items)), } else: filters = { "remote": (args.remote == "true") if args.remote is not None else None, "location": args.location, "experience_min": args.experience_min, "salary_min": args.salary_min, "salary_max": args.salary_max, "english": args.english, "doc_type": args.doc_type, # backward compat "role": args.role, "skill": args.skill, # new "roles_any": args.roles_any, "skills_any": args.skills_any, "skills_all": args.skills_all, } items = run_search( con, query=args.query, filters=filters, limit=args.limit, offset=args.offset, ) out = {"ai": False, "items": items, "count": len(items)} # 1) печатаем результаты _print_json(out) # 2) bundle prompt/flag if args.bundle == "yes": do_bundle = True elif args.bundle == "no": do_bundle = False else: # ask do_bundle = False if _is_interactive(): ans = input("\nСобрать найденные резюме в папку? (Y/N): ").strip().lower() do_bundle = ans in ("y", "yes", "да", "д") if do_bundle: ts = datetime.now().strftime("%Y%m%d_%H%M%S") out_dir = f"./bundle_{ts}" info = bundle_search_results(con, items, out_dir, copy_files=True, merge_text=True) print(f"\n[done] Готово: {info['out_dir']}") print(f" files copied: {info['copied_files']}, missing: {info['missing_files']}") print(f" merged: {info['merged_text']}") print(f" manifest: {info['manifest']}") return finally: con.close() # ============================= inspect ============================= if args.cmd == "inspect": fp = args.file path = Path(fp) extract_meta = {} if path.suffix.lower() == ".pdf": pdf_res = extract_pdf_best(path, timeout_sec=25) raw_text = pdf_res.text extract_meta = { "method": pdf_res.method, "quality_score": pdf_res.score, "quality_flags": pdf_res.flags, "pages": len(pdf_res.pages), } else: raw_text = extract_text_generic(path) extract_meta = {"method": "generic"} clean = normalize_text(raw_text or "") dt = detect_doc_type(clean, file_ext=Path(fp).suffix.lower()) secs = split_sections(clean, dt.doc_type) out = { "file": fp, "doc_type": dt.doc_type, "confidence": dt.confidence, "signals": dt.signals, "extract": extract_meta, "sections_present": sections_present(secs), "name_guess": extract_name_guess(clean), } _print_json(out) return if __name__ == "__main__": main()