from __future__ import annotations import argparse import json import re from pathlib import Path _OPTIONS_KEYWORDS: dict[str, float] = { "option": 2.0, "options": 2.0, "call": 1.0, "put": 1.0, "strike": 2.0, "expiration": 2.0, "expiry": 2.0, "premium": 2.0, "contract": 1.0, "underlying": 2.0, "open interest": 3.0, "bid-ask": 3.0, "bid ask": 3.0, "assignment": 3.0, "exercise": 2.0, "early exercise": 4.0, "delta": 3.0, "gamma": 3.0, "theta": 3.0, "vega": 3.0, "rho": 2.0, "implied volatility": 4.0, "historical volatility": 3.0, "volatility smile": 3.0, "skew": 2.0, "iv": 1.5, "spread": 2.0, "vertical spread": 4.0, "calendar spread": 4.0, "diagonal spread": 4.0, "credit spread": 4.0, "debit spread": 4.0, "iron condor": 5.0, "butterfly": 3.0, "straddle": 4.0, "strangle": 4.0, "covered call": 5.0, "protective put": 5.0, "cash-secured put": 5.0, "ratio spread": 4.0, "intrinsic value": 4.0, "time value": 4.0, "extrinsic value": 4.0, "breakeven": 3.0, "probability of profit": 4.0, "expected value": 3.0, "black-scholes": 5.0, "black scholes": 5.0, "binomial": 3.0, "greeks": 4.0, "margin": 2.0, "reg t": 2.0, "portfolio margin": 4.0, } _JUNK_PHRASES = [ "all rights reserved", "no part of this publication", "printed in", "publisher", "isbn", "library of congress", "copyright", "acknowledg", "about the author", "disclaimer", "warranty", ] def _fix_drop_caps(text: str) -> str: # Join single-letter drop caps like "O ptions" -> "Options". for _ in range(6): fixed = re.sub(r"\b([A-Za-z])\s+(?=[a-z])", r"\1", text) if fixed == text: break text = fixed return text def _clean_text(text: str) -> str: text = text.replace("\u00ad", "") # soft hyphen text = text.replace("\u200b", "") # zero-width space text = _fix_drop_caps(text) text = text.replace("\r\n", "\n").replace("\r", "\n") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def _normalize_for_score(text: str) -> str: text = _fix_drop_caps(text) text = text.lower() text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def _keyword_score(text: str) -> float: t = " " + _normalize_for_score(text) + " " score = 0.0 for kw, weight in _OPTIONS_KEYWORDS.items(): if " " in kw: n = t.count(" " + kw + " ") else: n = len(re.findall(rf"\b{re.escape(kw)}\b", t)) if n: score += weight * n return score def _looks_like_junk(text: str) -> bool: head = _normalize_for_score(text)[:800] if "table of contents" in head or re.search(r"\bcontents\b", head): return True if re.search(r"^\s*index\b", head): return True if any(p in head for p in _JUNK_PHRASES): return True return False def _chunk_text(text: str, *, chunk_chars: int, overlap_chars: int) -> list[str]: if chunk_chars <= 0: raise ValueError("chunk_chars must be > 0") if overlap_chars < 0: raise ValueError("overlap_chars must be >= 0") if overlap_chars >= chunk_chars: raise ValueError("overlap_chars must be < chunk_chars") chunks: list[str] = [] start = 0 while start < len(text): end = min(start + chunk_chars, len(text)) chunk = text[start:end].strip() if chunk: chunks.append(chunk) if end == len(text): break start = end - overlap_chars return chunks def main() -> int: parser = argparse.ArgumentParser(description="Build a JSONL dataset from extracted docs.") parser.add_argument("--manifest", type=Path, default=Path("training_data/manifest.json")) parser.add_argument("--text-dir", type=Path, default=Path("training_data/text")) parser.add_argument("--out", type=Path, default=Path("training_data/dataset.jsonl")) parser.add_argument("--chunk-chars", type=int, default=6000) parser.add_argument("--overlap-chars", type=int, default=400) parser.add_argument("--min-chars", type=int, default=1200) parser.add_argument("--min-score", type=float, default=0.0) parser.add_argument("--drop-junk", action="store_true") args = parser.parse_args() manifest = json.loads(args.manifest.read_text(encoding="utf-8")) docs = manifest.get("docs", []) if not docs: raise SystemExit(f"No docs in manifest: {args.manifest}") args.out.parent.mkdir(parents=True, exist_ok=True) n_docs = 0 n_chunks = 0 with args.out.open("w", encoding="utf-8") as f: for doc in docs: doc_id = doc["id"] primary = doc["primary"] txt_path = args.text_dir / f"{doc_id}.txt" if not txt_path.exists(): continue raw = txt_path.read_text(encoding="utf-8", errors="ignore") cleaned = _clean_text(raw) if len(cleaned) < args.min_chars: continue n_docs += 1 chunks = _chunk_text(cleaned, chunk_chars=args.chunk_chars, overlap_chars=args.overlap_chars) for i, chunk in enumerate(chunks): if len(chunk) < args.min_chars: continue if args.drop_junk and _looks_like_junk(chunk): continue if args.min_score > 0 and _keyword_score(chunk) < args.min_score: continue rec = { "text": chunk, "source": primary, "doc_id": doc_id, "chunk_index": i, } f.write(json.dumps(rec, ensure_ascii=False) + "\n") n_chunks += 1 stats_path = args.out.with_suffix(".stats.json") stats_path.write_text( json.dumps( { "docs_used": n_docs, "chunks_written": n_chunks, "chunk_chars": args.chunk_chars, "overlap_chars": args.overlap_chars, "min_chars": args.min_chars, "min_score": args.min_score, "drop_junk": args.drop_junk, }, indent=2, ), encoding="utf-8", ) print(f"Wrote {n_chunks} chunks from {n_docs} docs to {args.out}") print(f"Stats: {stats_path}") return 0 if __name__ == "__main__": raise SystemExit(main())