388 lines
12 KiB
Python
388 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import hashlib
|
|
import json
|
|
import math
|
|
import re
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
|
|
_OPTIONS_KEYWORDS: dict[str, float] = {
|
|
# core
|
|
"option": 2.0,
|
|
"options": 2.0,
|
|
"call": 1.0,
|
|
"put": 1.0,
|
|
"strike": 2.0,
|
|
"expiration": 2.0,
|
|
"expiry": 2.0,
|
|
"premium": 2.0,
|
|
"contract": 1.0,
|
|
"underlying": 2.0,
|
|
"open interest": 3.0,
|
|
"bid-ask": 3.0,
|
|
"bid ask": 3.0,
|
|
"assignment": 3.0,
|
|
"exercise": 2.0,
|
|
"early exercise": 4.0,
|
|
# greeks / vol
|
|
"delta": 3.0,
|
|
"gamma": 3.0,
|
|
"theta": 3.0,
|
|
"vega": 3.0,
|
|
"rho": 2.0,
|
|
"implied volatility": 4.0,
|
|
"historical volatility": 3.0,
|
|
"volatility smile": 3.0,
|
|
"skew": 2.0,
|
|
"iv": 1.5,
|
|
# strategies
|
|
"spread": 2.0,
|
|
"vertical spread": 4.0,
|
|
"calendar spread": 4.0,
|
|
"diagonal spread": 4.0,
|
|
"credit spread": 4.0,
|
|
"debit spread": 4.0,
|
|
"iron condor": 5.0,
|
|
"butterfly": 3.0,
|
|
"straddle": 4.0,
|
|
"strangle": 4.0,
|
|
"covered call": 5.0,
|
|
"protective put": 5.0,
|
|
"cash-secured put": 5.0,
|
|
"ratio spread": 4.0,
|
|
# risk / pricing
|
|
"intrinsic value": 4.0,
|
|
"time value": 4.0,
|
|
"extrinsic value": 4.0,
|
|
"breakeven": 3.0,
|
|
"probability of profit": 4.0,
|
|
"expected value": 3.0,
|
|
"black-scholes": 5.0,
|
|
"black scholes": 5.0,
|
|
"binomial": 3.0,
|
|
"greeks": 4.0,
|
|
"margin": 2.0,
|
|
"reg t": 2.0,
|
|
"portfolio margin": 4.0,
|
|
}
|
|
|
|
_JUNK_PHRASES = [
|
|
"all rights reserved",
|
|
"no part of this publication",
|
|
"printed in",
|
|
"publisher",
|
|
"isbn",
|
|
"library of congress",
|
|
"copyright",
|
|
"acknowledg",
|
|
"about the author",
|
|
"disclaimer",
|
|
"warranty",
|
|
]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Segment:
|
|
source_path: str
|
|
locator: str # "page:123" or "section:foo"
|
|
text: str
|
|
score: float
|
|
|
|
|
|
def _fix_drop_caps(text: str) -> str:
|
|
# Join single-letter drop caps like "O ptions" -> "Options".
|
|
for _ in range(6):
|
|
fixed = re.sub(r"\b([A-Za-z])\s+(?=[a-z])", r"\1", text)
|
|
if fixed == text:
|
|
break
|
|
text = fixed
|
|
return text
|
|
|
|
|
|
def _normalize(text: str) -> str:
|
|
text = text.replace("\u00ad", "") # soft hyphen
|
|
text = text.replace("\u200b", "") # zero-width space
|
|
text = _fix_drop_caps(text)
|
|
text = text.lower()
|
|
text = re.sub(r"[ \t]+", " ", text)
|
|
text = re.sub(r"\n{3,}", "\n\n", text)
|
|
return text.strip()
|
|
|
|
|
|
def _sha256(text: str) -> str:
|
|
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()
|
|
|
|
|
|
def _tokenish(text: str) -> list[str]:
|
|
return re.findall(r"[a-z]{2,}", text.lower())
|
|
|
|
|
|
def _keyword_score(text: str) -> tuple[float, dict[str, int]]:
|
|
t = " " + _normalize(text) + " "
|
|
hits: dict[str, int] = {}
|
|
score = 0.0
|
|
for kw, weight in _OPTIONS_KEYWORDS.items():
|
|
if " " in kw:
|
|
n = t.count(" " + kw + " ")
|
|
else:
|
|
n = len(re.findall(rf"\b{re.escape(kw)}\b", t))
|
|
if n:
|
|
hits[kw] = n
|
|
score += weight * n
|
|
return score, hits
|
|
|
|
|
|
def _looks_like_toc(text: str) -> bool:
|
|
t = _normalize(text)
|
|
head = t[:400]
|
|
if "table of contents" in head or re.search(r"\bcontents\b", head):
|
|
return True
|
|
|
|
lines = [ln.strip() for ln in t.splitlines() if ln.strip()]
|
|
if len(lines) < 10:
|
|
return False
|
|
|
|
# Many lines ending with digits and/or dotted leaders
|
|
end_num = sum(1 for ln in lines if re.search(r"(\\.{2,}|\\s)\\d{1,4}$", ln))
|
|
dotted = sum(1 for ln in lines if "..." in ln or re.search(r"\\.{4,}", ln))
|
|
shortish = sum(1 for ln in lines if len(ln) <= 60)
|
|
|
|
if end_num / len(lines) >= 0.35 and (dotted + end_num) / len(lines) >= 0.35 and shortish / len(lines) >= 0.5:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _looks_like_index(text: str) -> bool:
|
|
t = _normalize(text)
|
|
head = t[:400]
|
|
if re.search(r"^\\s*index\\b", head):
|
|
return True
|
|
|
|
lines = [ln.strip() for ln in t.splitlines() if ln.strip()]
|
|
if len(lines) < 15:
|
|
return False
|
|
|
|
indexish = 0
|
|
for ln in lines[:200]:
|
|
if re.search(r"\\b\\d{1,4}(?:,\\s*\\d{1,4}){2,}\\b", ln):
|
|
indexish += 1
|
|
continue
|
|
if re.search(r"^[a-z].{1,60}\\s+\\d{1,4}(?:,\\s*\\d{1,4})+\\b", ln):
|
|
indexish += 1
|
|
continue
|
|
return indexish >= max(10, math.ceil(0.25 * min(len(lines), 200)))
|
|
|
|
|
|
def _looks_like_front_matter(text: str) -> bool:
|
|
t = _normalize(text)
|
|
head = t[:800]
|
|
if any(p in head for p in _JUNK_PHRASES):
|
|
return True
|
|
# Too little prose
|
|
toks = _tokenish(head)
|
|
if len(toks) < 80 and (("isbn" in head) or ("copyright" in head)):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _is_junk(text: str) -> str | None:
|
|
if _looks_like_toc(text):
|
|
return "toc"
|
|
if _looks_like_index(text):
|
|
return "index"
|
|
if _looks_like_front_matter(text):
|
|
return "front_matter"
|
|
return None
|
|
|
|
|
|
def _iter_files(root: Path, include: list[str], exclude: list[str]) -> Iterable[Path]:
|
|
exts = {".pdf", ".epub"}
|
|
for p in root.rglob("*"):
|
|
if p.is_file() and p.suffix.lower() in exts:
|
|
path_lower = str(p).lower()
|
|
if include and not any(token in path_lower for token in include):
|
|
continue
|
|
if exclude and any(token in path_lower for token in exclude):
|
|
continue
|
|
yield p
|
|
|
|
|
|
def _extract_pdf_segments(path: Path) -> list[tuple[str, str]]:
|
|
from pypdf import PdfReader
|
|
|
|
reader = PdfReader(str(path))
|
|
out: list[tuple[str, str]] = []
|
|
for i, page in enumerate(reader.pages, start=1):
|
|
try:
|
|
txt = page.extract_text() or ""
|
|
except Exception:
|
|
txt = ""
|
|
out.append((f"page:{i}", txt))
|
|
return out
|
|
|
|
|
|
def _extract_epub_segments(path: Path) -> list[tuple[str, str]]:
|
|
from bs4 import BeautifulSoup
|
|
from ebooklib import ITEM_DOCUMENT, epub
|
|
|
|
book = epub.read_epub(str(path))
|
|
out: list[tuple[str, str]] = []
|
|
idx = 0
|
|
for item in book.get_items():
|
|
if item.get_type() != ITEM_DOCUMENT:
|
|
continue
|
|
idx += 1
|
|
soup = BeautifulSoup(item.get_body_content(), "lxml")
|
|
txt = soup.get_text("\n", strip=True)
|
|
name = getattr(item, "file_name", None) or f"doc:{idx}"
|
|
out.append((f"section:{name}", txt))
|
|
return out
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Select option-trading-relevant pages/sections from PDFs/EPUBs.")
|
|
parser.add_argument("--input", type=Path, default=Path("eBooks"))
|
|
parser.add_argument("--out", type=Path, default=Path("training_data/relevant"))
|
|
parser.add_argument("--min-score", type=float, default=10.0)
|
|
parser.add_argument("--front-matter-score", type=float, default=None)
|
|
parser.add_argument("--min-chars", type=int, default=800)
|
|
parser.add_argument("--neighbors", type=int, default=1, help="Include +/- N neighbor pages/sections around hits.")
|
|
parser.add_argument(
|
|
"--include",
|
|
action="append",
|
|
default=[],
|
|
help="Only include files whose path contains this substring (case-insensitive).",
|
|
)
|
|
parser.add_argument(
|
|
"--exclude",
|
|
action="append",
|
|
default=[],
|
|
help="Skip files whose path contains this substring (case-insensitive).",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
out_dir: Path = args.out
|
|
text_dir = out_dir / "text"
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
text_dir.mkdir(parents=True, exist_ok=True)
|
|
front_matter_min_score = args.front_matter_score if args.front_matter_score is not None else args.min_score
|
|
include = [token.lower() for token in args.include]
|
|
exclude = [token.lower() for token in args.exclude]
|
|
|
|
seen_hashes: set[str] = set()
|
|
selected: list[dict] = []
|
|
report_rows: list[dict] = []
|
|
|
|
for file_path in sorted(_iter_files(args.input, include, exclude)):
|
|
suffix = file_path.suffix.lower()
|
|
if suffix == ".pdf":
|
|
segs = _extract_pdf_segments(file_path)
|
|
elif suffix == ".epub":
|
|
segs = _extract_epub_segments(file_path)
|
|
else:
|
|
continue
|
|
|
|
scored: list[tuple[int, str, str, float, dict[str, int], str | None]] = []
|
|
for idx, (loc, txt) in enumerate(segs):
|
|
if not txt or len(txt) < args.min_chars:
|
|
scored.append((idx, loc, txt, 0.0, {}, "too_short"))
|
|
continue
|
|
score, hits = _keyword_score(txt)
|
|
junk = _is_junk(txt)
|
|
scored.append((idx, loc, txt, score, hits, junk))
|
|
|
|
keep_indices: set[int] = set()
|
|
for idx, loc, txt, score, hits, junk in scored:
|
|
if junk in {"toc", "index"}:
|
|
continue
|
|
if junk == "front_matter" and score < front_matter_min_score:
|
|
continue
|
|
if score < args.min_score:
|
|
continue
|
|
keep_indices.add(idx)
|
|
for d in range(1, args.neighbors + 1):
|
|
keep_indices.add(idx - d)
|
|
keep_indices.add(idx + d)
|
|
|
|
keep_indices = {i for i in keep_indices if 0 <= i < len(scored)}
|
|
|
|
for idx, loc, txt, score, hits, junk in scored:
|
|
if idx not in keep_indices:
|
|
continue
|
|
if not txt or len(txt) < args.min_chars:
|
|
continue
|
|
if junk in {"toc", "index"}:
|
|
continue
|
|
# For neighbor pages, allow some front matter, but only if score isn't near-zero
|
|
if junk == "front_matter" and score < front_matter_min_score:
|
|
continue
|
|
|
|
norm = _normalize(txt)
|
|
seg_hash = _sha256(norm)
|
|
if seg_hash in seen_hashes:
|
|
continue
|
|
seen_hashes.add(seg_hash)
|
|
|
|
(text_dir / f"{seg_hash}.txt").write_text(txt, encoding="utf-8", errors="ignore")
|
|
|
|
src = str(file_path)
|
|
primary = f"{src}#{loc}"
|
|
selected.append(
|
|
{
|
|
"id": seg_hash,
|
|
"primary": primary,
|
|
"duplicates": [],
|
|
"chars": len(txt),
|
|
"score": score,
|
|
"hits": hits,
|
|
}
|
|
)
|
|
report_rows.append(
|
|
{
|
|
"id": seg_hash,
|
|
"source": src,
|
|
"locator": loc,
|
|
"score": f"{score:.2f}",
|
|
"chars": str(len(txt)),
|
|
"junk": junk or "",
|
|
"top_hits": ";".join(sorted(hits.keys())[:12]),
|
|
}
|
|
)
|
|
|
|
manifest_path = out_dir / "manifest.json"
|
|
manifest_path.write_text(json.dumps({"docs": selected}, indent=2), encoding="utf-8")
|
|
|
|
report_path = out_dir / "report.csv"
|
|
with report_path.open("w", encoding="utf-8", newline="") as f:
|
|
writer = csv.DictWriter(
|
|
f,
|
|
fieldnames=["id", "source", "locator", "score", "chars", "junk", "top_hits"],
|
|
)
|
|
writer.writeheader()
|
|
writer.writerows(report_rows)
|
|
|
|
corpus_path = out_dir / "corpus.txt"
|
|
with corpus_path.open("w", encoding="utf-8", errors="ignore") as f:
|
|
for doc in selected:
|
|
f.write("\n\n" + "=" * 80 + "\n")
|
|
f.write(f"SOURCE: {doc['primary']}\n")
|
|
f.write(f"SCORE: {doc.get('score', 0):.2f}\n")
|
|
f.write("=" * 80 + "\n\n")
|
|
f.write((text_dir / f"{doc['id']}.txt").read_text(encoding="utf-8", errors="ignore"))
|
|
f.write("\n")
|
|
|
|
print(f"Selected segments: {len(selected)}")
|
|
print(f"Manifest: {manifest_path}")
|
|
print(f"Report: {report_path}")
|
|
print(f"Corpus: {corpus_path}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|