174 lines
5.1 KiB
Python
174 lines
5.1 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ExtractedDoc:
|
|
source_path: str
|
|
text: str
|
|
|
|
|
|
def _normalize_for_hash(text: str) -> str:
|
|
text = text.replace("\u00ad", "") # soft hyphen
|
|
text = text.replace("\u200b", "") # zero-width space
|
|
text = text.lower()
|
|
text = re.sub(r"[ \t]+", " ", text)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def _sha256_text(text: str) -> str:
|
|
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()
|
|
|
|
|
|
def _extract_pdf(path: Path) -> str:
|
|
from pypdf import PdfReader
|
|
|
|
reader = PdfReader(str(path))
|
|
parts: list[str] = []
|
|
for page in reader.pages:
|
|
try:
|
|
parts.append(page.extract_text() or "")
|
|
except Exception:
|
|
parts.append("")
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _extract_epub(path: Path) -> str:
|
|
from bs4 import BeautifulSoup
|
|
from ebooklib import ITEM_DOCUMENT, epub
|
|
|
|
book = epub.read_epub(str(path))
|
|
parts: list[str] = []
|
|
for item in book.get_items():
|
|
if item.get_type() != ITEM_DOCUMENT:
|
|
continue
|
|
soup = BeautifulSoup(item.get_body_content(), "lxml")
|
|
parts.append(soup.get_text("\n", strip=True))
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _read_text_file(path: Path) -> str:
|
|
import chardet
|
|
|
|
raw = path.read_bytes()
|
|
guess = chardet.detect(raw)
|
|
encoding = guess.get("encoding") or "utf-8"
|
|
try:
|
|
return raw.decode(encoding, errors="replace")
|
|
except LookupError:
|
|
return raw.decode("utf-8", errors="replace")
|
|
|
|
|
|
def extract_text(path: Path) -> ExtractedDoc | None:
|
|
suffix = path.suffix.lower()
|
|
try:
|
|
if suffix == ".pdf":
|
|
return ExtractedDoc(str(path), _extract_pdf(path))
|
|
if suffix == ".epub":
|
|
return ExtractedDoc(str(path), _extract_epub(path))
|
|
if suffix in {".txt", ".md"}:
|
|
return ExtractedDoc(str(path), _read_text_file(path))
|
|
except Exception:
|
|
return None
|
|
return None
|
|
|
|
|
|
def iter_candidate_files(root: Path) -> Iterable[Path]:
|
|
exts = {".pdf", ".epub", ".txt", ".md"}
|
|
for path in root.rglob("*"):
|
|
if not path.is_file():
|
|
continue
|
|
if path.suffix.lower() not in exts:
|
|
continue
|
|
yield path
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Extract and dedupe local documents into a plain-text corpus.")
|
|
parser.add_argument("--input", type=Path, default=Path("eBooks"), help="Input directory to scan (default: eBooks).")
|
|
parser.add_argument(
|
|
"--out",
|
|
type=Path,
|
|
default=Path("training_data"),
|
|
help="Output directory (default: training_data).",
|
|
)
|
|
parser.add_argument(
|
|
"--min-chars",
|
|
type=int,
|
|
default=2000,
|
|
help="Skip extracted docs shorter than this (default: 2000).",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
in_dir: Path = args.input
|
|
out_dir: Path = args.out
|
|
out_text_dir = out_dir / "text"
|
|
out_text_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
manifest_path = out_dir / "manifest.json"
|
|
corpus_path = out_dir / "corpus.txt"
|
|
rejected_path = out_dir / "rejected.json"
|
|
|
|
docs: dict[str, dict] = {}
|
|
rejected: list[dict] = []
|
|
seen_hashes: set[str] = set()
|
|
|
|
candidates = sorted(iter_candidate_files(in_dir))
|
|
for file_path in candidates:
|
|
extracted = extract_text(file_path)
|
|
if extracted is None:
|
|
rejected.append({"path": str(file_path), "reason": "extract_failed"})
|
|
continue
|
|
|
|
normalized = _normalize_for_hash(extracted.text)
|
|
if len(normalized) < args.min_chars:
|
|
rejected.append({"path": str(file_path), "reason": "too_short"})
|
|
continue
|
|
|
|
doc_hash = _sha256_text(normalized)
|
|
if doc_hash in seen_hashes:
|
|
docs[doc_hash]["duplicates"].append(str(file_path))
|
|
continue
|
|
|
|
seen_hashes.add(doc_hash)
|
|
out_txt = out_text_dir / f"{doc_hash}.txt"
|
|
out_txt.write_text(extracted.text, encoding="utf-8", errors="ignore")
|
|
|
|
docs[doc_hash] = {
|
|
"id": doc_hash,
|
|
"primary": str(file_path),
|
|
"duplicates": [],
|
|
"chars": len(extracted.text),
|
|
}
|
|
|
|
manifest_path.write_text(json.dumps({"docs": list(docs.values())}, indent=2), encoding="utf-8")
|
|
rejected_path.write_text(json.dumps(rejected, indent=2), encoding="utf-8")
|
|
|
|
# Build concatenated corpus
|
|
with corpus_path.open("w", encoding="utf-8", errors="ignore") as f:
|
|
for doc in docs.values():
|
|
f.write("\n\n" + "=" * 80 + "\n")
|
|
f.write(f"SOURCE: {doc['primary']}\n")
|
|
f.write("=" * 80 + "\n\n")
|
|
f.write((out_text_dir / f"{doc['id']}.txt").read_text(encoding="utf-8", errors="ignore"))
|
|
f.write("\n")
|
|
|
|
print(f"Extracted unique docs: {len(docs)}")
|
|
print(f"Wrote corpus: {corpus_path}")
|
|
print(f"Manifest: {manifest_path}")
|
|
if rejected:
|
|
print(f"Rejected: {len(rejected)} (see {rejected_path})")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|