100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
import logging
|
|
from bs4 import BeautifulSoup
|
|
|
|
try: # optional dependency for PDF fallback
|
|
from pypdf import PdfReader as _PdfReader # type: ignore
|
|
except Exception: # pragma: no cover - optional import
|
|
try:
|
|
from PyPDF2 import PdfReader as _PdfReader # type: ignore
|
|
except Exception: # pragma: no cover
|
|
_PdfReader = None # type: ignore
|
|
|
|
def _read_bytes(path: Path) -> bytes:
|
|
return path.read_bytes()
|
|
|
|
def extract_text_from_txt(path: Path) -> str:
|
|
data = _read_bytes(path)
|
|
for enc in ("utf-8", "utf-16", "cp1251", "latin-1"):
|
|
try:
|
|
return data.decode(enc, errors="ignore")
|
|
except Exception:
|
|
continue
|
|
return data.decode("utf-8", errors="ignore")
|
|
|
|
def extract_text_from_html(path: Path) -> str:
|
|
html = extract_text_from_txt(path)
|
|
soup = BeautifulSoup(html, "lxml")
|
|
return soup.get_text("\n", strip=True)
|
|
|
|
def extract_text_from_docx(path: Path) -> str:
|
|
from docx import Document
|
|
doc = Document(str(path))
|
|
parts = []
|
|
for p in doc.paragraphs:
|
|
if p.text and p.text.strip():
|
|
parts.append(p.text.strip())
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
cells = [c.text.strip() for c in row.cells if c.text and c.text.strip()]
|
|
if cells:
|
|
parts.append(" | ".join(cells))
|
|
return "\n".join(parts)
|
|
|
|
_PDF_PAGE_LIMIT = int(os.environ.get("PDF_PAGE_LIMIT", "40"))
|
|
# Silence noisy pypdf warnings like "Ignoring wrong pointing object ..."
|
|
logging.getLogger("pypdf").setLevel(logging.ERROR)
|
|
logging.getLogger("PyPDF2").setLevel(logging.ERROR)
|
|
|
|
|
|
def extract_text_from_pdf(path: Path) -> str:
|
|
"""
|
|
Lightweight PDF extractor; prefers optional PyPDF-based readers over heavy pdfminer.
|
|
Reads at most PDF_PAGE_LIMIT pages (default 40) to avoid pathological files.
|
|
"""
|
|
if _PdfReader is None:
|
|
raise RuntimeError("PDF reader dependency missing (install pypdf or PyPDF2)")
|
|
|
|
try:
|
|
reader = _PdfReader(str(path), strict=False)
|
|
except Exception as exc: # pragma: no cover - pdf parser edge cases
|
|
raise RuntimeError(f"PDF read failed: {exc}") from exc
|
|
|
|
parts = []
|
|
for idx, page in enumerate(getattr(reader, "pages", [])):
|
|
if _PDF_PAGE_LIMIT and idx >= _PDF_PAGE_LIMIT:
|
|
break
|
|
try:
|
|
text = page.extract_text() # type: ignore[attr-defined]
|
|
except Exception:
|
|
text = None
|
|
if text:
|
|
parts.append(text)
|
|
return "\n".join(parts)
|
|
|
|
def extract_text_from_doc_best_effort(path: Path) -> str:
|
|
# .doc requires external tools; best-effort if textract installed
|
|
try:
|
|
import textract # type: ignore
|
|
b = textract.process(str(path))
|
|
return b.decode("utf-8", errors="ignore")
|
|
except Exception:
|
|
return ""
|
|
|
|
def extract_text(path: Path) -> str:
|
|
ext = path.suffix.lower()
|
|
if ext in (".txt", ".log"):
|
|
return extract_text_from_txt(path)
|
|
if ext in (".html", ".htm"):
|
|
return extract_text_from_html(path)
|
|
if ext == ".docx":
|
|
return extract_text_from_docx(path)
|
|
if ext == ".pdf":
|
|
return extract_text_from_pdf(path)
|
|
if ext == ".doc":
|
|
return extract_text_from_doc_best_effort(path)
|
|
return ""
|