Compare commits

...

6 Commits

Author SHA1 Message Date
Rushabh Gosar
83a5e843c0 Feature: Merge Truth Social scraper logic into SimpleScraper 2026-01-09 18:30:39 -08:00
Rushabh Gosar
4e02c6ce0a Fix: Remove default MSFT symbol to prevent silent data errors 2026-01-09 17:32:10 -08:00
Rushabh Gosar
c01a98abce Prune profile payload for options thesis 2025-12-29 12:27:30 -08:00
Rushabh Gosar
68805ed80a Add profile endpoint and validation 2025-12-29 00:45:13 -08:00
711d87a998 Add GPU-aware launch and testing docs 2025-12-28 12:19:53 -08:00
bce40014ad Add strikeLimit parameter and refresh docs 2025-12-28 11:51:06 -08:00
4 changed files with 1942 additions and 599 deletions

1316
AGENTS.md

File diff suppressed because it is too large Load Diff

View File

@@ -2,11 +2,13 @@ from flask import Flask, jsonify, request
from playwright.sync_api import sync_playwright from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from datetime import datetime, timezone from datetime import datetime, timezone
import html
import urllib.parse import urllib.parse
import logging import logging
import json import json
import re import re
import time import time
import os
app = Flask(__name__) app = Flask(__name__)
@@ -25,6 +27,49 @@ DATE_FORMATS = (
"%B %d, %Y", "%B %d, %Y",
) )
GPU_ACCEL_ENV = "ENABLE_GPU"
def parse_env_flag(value, default=False):
if value is None:
return default
return str(value).strip().lower() in ("1", "true", "yes", "on")
def detect_gpu_available():
env_value = os.getenv(GPU_ACCEL_ENV)
if env_value is not None:
return parse_env_flag(env_value, default=False)
nvidia_visible = os.getenv("NVIDIA_VISIBLE_DEVICES")
if nvidia_visible and nvidia_visible.lower() not in ("none", "void", "off"):
return True
if os.path.exists("/dev/nvidia0"):
return True
if os.path.exists("/dev/dri/renderD128") or os.path.exists("/dev/dri/card0"):
return True
return False
def chromium_launch_args():
if not detect_gpu_available():
return []
if os.name == "nt":
return ["--enable-gpu"]
return [
"--enable-gpu",
"--ignore-gpu-blocklist",
"--disable-software-rasterizer",
"--use-gl=egl",
"--enable-zero-copy",
"--enable-gpu-rasterization",
]
def parse_date(value): def parse_date(value):
for fmt in DATE_FORMATS: for fmt in DATE_FORMATS:
@@ -61,6 +106,14 @@ def extract_raw_value(value):
return value return value
def extract_value(value):
if isinstance(value, dict):
if value.get("raw") is not None:
return value.get("raw")
return value.get("fmt")
return value
def extract_fmt_value(value): def extract_fmt_value(value):
if isinstance(value, dict): if isinstance(value, dict):
return value.get("fmt") return value.get("fmt")
@@ -299,7 +352,631 @@ def wait_for_tables(page):
return [] return []
def scrape_yahoo_options(symbol, expiration=None): def parse_strike_limit(value, default=25):
if value is None:
return default
try:
limit = int(value)
except (TypeError, ValueError):
return default
return limit if limit > 0 else default
def parse_sveltekit_payload(raw_text):
if not raw_text:
return None
try:
outer = json.loads(raw_text)
except json.JSONDecodeError:
return None
body = outer.get("body")
if isinstance(body, str):
try:
body = json.loads(body)
except json.JSONDecodeError:
pass
return {
"status": outer.get("status"),
"statusText": outer.get("statusText"),
"body": body,
}
def extract_sveltekit_payloads_from_soup(soup):
payloads = {}
if soup is None:
return payloads
scripts = soup.select('script[type="application/json"][data-sveltekit-fetched]')
for script in scripts:
url = script.get("data-url")
if not url:
continue
url = html.unescape(url)
raw_text = script.string or script.get_text()
payload = parse_sveltekit_payload(raw_text)
if not payload:
continue
payloads[url] = payload
return payloads
def select_payload(payloads, needle, symbol=None):
if not payloads:
return None, None
needle = needle.lower()
symbol_token = symbol.lower() if symbol else None
fallback = None
for url, payload in payloads.items():
url_lower = url.lower()
if needle not in url_lower:
continue
if symbol_token:
if f"/{symbol_token}" in url_lower or f"symbols={symbol_token}" in url_lower:
return url, payload.get("body")
if fallback is None:
fallback = (url, payload.get("body"))
return fallback if fallback else (None, None)
def extract_quote_summary(payload):
if not payload:
return None
summary = payload.get("quoteSummary")
if not summary:
return None
result = summary.get("result") or []
return result[0] if result else None
def extract_quote_response(payload):
if not payload:
return None
response = payload.get("quoteResponse")
if not response:
return None
result = response.get("result") or []
return result[0] if result else None
def extract_quote_type(payload):
if not payload:
return None
quote_type = payload.get("quoteType")
if not quote_type:
return None
result = quote_type.get("result") or []
return result[0] if result else None
def extract_recent_news_from_soup(soup, limit=20):
items = []
if soup is None:
return items
container = soup.select_one('[data-testid="recent-news"]')
root = container if container else soup
seen = set()
for item in root.select('[data-testid="storyitem"]'):
title_el = item.select_one("h3")
link_el = item.select_one("a[href]")
if not title_el and not link_el:
continue
title = title_el.get_text(strip=True) if title_el else None
link = link_el.get("href") if link_el else None
publisher = None
published = None
publishing = item.select_one(".publishing")
if publishing:
text = " ".join(publishing.stripped_strings)
if "\u2022" in text:
parts = [part.strip() for part in text.split("\u2022", 1)]
publisher = parts[0] or None
published = parts[1] if len(parts) > 1 else None
else:
publisher = text or None
key = link or title
if key and key in seen:
continue
if key:
seen.add(key)
items.append(
{
"title": title,
"publisher": publisher,
"published": published,
"link": link,
}
)
if limit and len(items) >= limit:
break
return items
def extract_news_summary_from_soup(soup):
if soup is None:
return None
summary = soup.select_one('[data-testid="ticker-news-summary"]')
if not summary:
return None
text = " ".join(summary.stripped_strings)
return text if text else None
def build_profile_key_metrics(summary_detail, key_stats, financial_data, price_data, quote):
summary_detail = summary_detail or {}
key_stats = key_stats or {}
financial_data = financial_data or {}
price_data = price_data or {}
quote = quote or {}
def pick_value(*values):
for value in values:
if value is not None:
return value
return None
return {
"previous_close": extract_raw_value(summary_detail.get("previousClose")),
"open": extract_raw_value(summary_detail.get("open")),
"bid": extract_raw_value(summary_detail.get("bid")),
"ask": extract_raw_value(summary_detail.get("ask")),
"bid_size": extract_raw_value(summary_detail.get("bidSize")),
"ask_size": extract_raw_value(summary_detail.get("askSize")),
"day_low": extract_raw_value(summary_detail.get("dayLow")),
"day_high": extract_raw_value(summary_detail.get("dayHigh")),
"fifty_two_week_low": extract_raw_value(quote.get("fiftyTwoWeekLow")),
"fifty_two_week_high": extract_raw_value(quote.get("fiftyTwoWeekHigh")),
"volume": pick_value(
extract_raw_value(summary_detail.get("volume")),
extract_raw_value(price_data.get("regularMarketVolume")),
extract_raw_value(quote.get("regularMarketVolume")),
),
"average_volume": pick_value(
extract_raw_value(summary_detail.get("averageVolume")),
extract_raw_value(price_data.get("averageDailyVolume3Month")),
),
"market_cap": pick_value(
extract_raw_value(summary_detail.get("marketCap")),
extract_raw_value(quote.get("marketCap")),
),
"beta": pick_value(
extract_raw_value(summary_detail.get("beta")),
extract_raw_value(key_stats.get("beta")),
),
"trailing_pe": pick_value(
extract_raw_value(summary_detail.get("trailingPE")),
extract_raw_value(key_stats.get("trailingPE")),
),
"forward_pe": pick_value(
extract_raw_value(summary_detail.get("forwardPE")),
extract_raw_value(key_stats.get("forwardPE")),
),
"eps_trailing": extract_raw_value(key_stats.get("trailingEps")),
"eps_forward": extract_raw_value(key_stats.get("forwardEps")),
"dividend_rate": extract_raw_value(summary_detail.get("dividendRate")),
"dividend_yield": extract_raw_value(summary_detail.get("dividendYield")),
"ex_dividend_date": extract_raw_value(summary_detail.get("exDividendDate")),
"payout_ratio": extract_raw_value(summary_detail.get("payoutRatio")),
"implied_volatility": extract_raw_value(summary_detail.get("impliedVolatility")),
"current_price": pick_value(
extract_raw_value(price_data.get("regularMarketPrice")),
extract_raw_value(financial_data.get("currentPrice")),
extract_raw_value(quote.get("regularMarketPrice")),
),
"recommendation_key": financial_data.get("recommendationKey"),
"recommendation_mean": extract_raw_value(financial_data.get("recommendationMean")),
"target_price_high": extract_raw_value(financial_data.get("targetHighPrice")),
"target_price_low": extract_raw_value(financial_data.get("targetLowPrice")),
"target_price_mean": extract_raw_value(financial_data.get("targetMeanPrice")),
"target_price_median": extract_raw_value(financial_data.get("targetMedianPrice")),
"analyst_opinion_count": extract_raw_value(
financial_data.get("numberOfAnalystOpinions")
),
}
def simplify_recommendation_trend(trend):
simplified = []
for entry in trend or []:
simplified.append(
{
"period": entry.get("period"),
"strong_buy": entry.get("strongBuy"),
"buy": entry.get("buy"),
"hold": entry.get("hold"),
"sell": entry.get("sell"),
"strong_sell": entry.get("strongSell"),
}
)
return simplified
def simplify_upgrade_history(history, limit=20):
simplified = []
for entry in history or []:
simplified.append(
{
"firm": entry.get("firm"),
"action": entry.get("action"),
"from_grade": entry.get("fromGrade"),
"to_grade": entry.get("toGrade"),
"date": entry.get("epochGradeDate") or entry.get("gradeDate"),
}
)
if limit and len(simplified) >= limit:
break
return simplified
def simplify_ratings_top(payload):
if not payload:
return None
simplified = {}
for key, value in payload.items():
if not isinstance(value, dict):
continue
simplified[key] = {
"analyst": value.get("analyst"),
"rating_current": value.get("rating_current"),
"rating_sentiment": value.get("rating_sentiment"),
"pt_current": value.get("pt_current"),
"adjusted_pt_current": value.get("adjusted_pt_current"),
"announcement_date": value.get("announcement_date"),
"datapoints": value.get("datapoints"),
"scores": {
"dir": extract_value(value.get("dir")),
"mm": extract_value(value.get("mm")),
"pt": extract_value(value.get("pt")),
"fin_score": extract_value(value.get("fin_score")),
},
}
return simplified or None
def summarize_performance(perf_data):
if not perf_data:
return {}
overview = perf_data.get("performanceOverview")
if isinstance(overview, dict):
return {
"as_of_date": extract_value(overview.get("asOfDate")),
"returns": {
"five_day": extract_value(overview.get("fiveDaysReturn")),
"one_month": extract_value(overview.get("oneMonthReturn")),
"three_month": extract_value(overview.get("threeMonthReturn")),
"six_month": extract_value(overview.get("sixMonthReturn")),
"ytd": extract_value(overview.get("ytdReturnPct")),
"one_year": extract_value(overview.get("oneYearTotalReturn")),
"two_year": extract_value(overview.get("twoYearTotalReturn")),
"three_year": extract_value(overview.get("threeYearTotalReturn")),
"five_year": extract_value(overview.get("fiveYearTotalReturn")),
"ten_year": extract_value(overview.get("tenYearTotalReturn")),
"max": extract_value(overview.get("maxReturn")),
},
}
summary = []
for entry in overview or []:
if not isinstance(entry, dict):
continue
summary.append(
{
"period": entry.get("period"),
"performance": extract_value(entry.get("performance")),
"benchmark": extract_value(entry.get("benchmark")),
}
)
return {"periods": summary} if summary else {}
def summarize_earnings(earnings, calendar_events):
earnings = earnings or {}
calendar_events = calendar_events or {}
earnings_chart = earnings.get("earningsChart", {}) or {}
financials_chart = earnings.get("financialsChart", {}) or {}
calendar_earnings = calendar_events.get("earnings", {}) or {}
quarterly = []
for entry in earnings_chart.get("quarterly") or []:
quarterly.append(
{
"quarter": entry.get("date"),
"actual": extract_value(entry.get("actual")),
"estimate": extract_value(entry.get("estimate")),
"surprise": extract_value(entry.get("difference")),
"surprise_percent": extract_value(entry.get("surprisePct")),
}
)
yearly = []
for entry in financials_chart.get("yearly") or []:
yearly.append(
{
"year": entry.get("date"),
"revenue": extract_value(entry.get("revenue")),
"earnings": extract_value(entry.get("earnings")),
}
)
quarterly_financials = []
for entry in financials_chart.get("quarterly") or []:
quarterly_financials.append(
{
"quarter": entry.get("date"),
"revenue": extract_value(entry.get("revenue")),
"earnings": extract_value(entry.get("earnings")),
}
)
return {
"next_earnings_dates": [
extract_value(value) for value in calendar_earnings.get("earningsDate", []) or []
],
"is_earnings_date_estimate": calendar_earnings.get("isEarningsDateEstimate"),
"earnings_estimates": {
"average": extract_value(calendar_earnings.get("earningsAverage")),
"low": extract_value(calendar_earnings.get("earningsLow")),
"high": extract_value(calendar_earnings.get("earningsHigh")),
},
"revenue_estimates": {
"average": extract_value(calendar_earnings.get("revenueAverage")),
"low": extract_value(calendar_earnings.get("revenueLow")),
"high": extract_value(calendar_earnings.get("revenueHigh")),
},
"quarterly_earnings": quarterly[:4],
"yearly_financials": yearly[:4],
"quarterly_financials": quarterly_financials[:4],
"current_quarter_estimate": extract_value(
earnings_chart.get("currentQuarterEstimate")
),
"current_quarter_estimate_date": earnings_chart.get("currentQuarterEstimateDate"),
"current_calendar_quarter": earnings_chart.get("currentCalendarQuarter"),
"current_fiscal_quarter": earnings_chart.get("currentFiscalQuarter"),
}
def scrape_yahoo_profile(symbol):
encoded = urllib.parse.quote(symbol, safe="")
url = f"https://finance.yahoo.com/quote/{encoded}/"
app.logger.info("Starting profile scrape for symbol=%s url=%s", symbol, url)
response_html = None
rendered_html = None
payloads = {}
with sync_playwright() as p:
launch_args = chromium_launch_args()
if launch_args:
app.logger.info("GPU acceleration enabled")
else:
app.logger.info("GPU acceleration disabled")
browser = p.chromium.launch(headless=True, args=launch_args)
page = browser.new_page()
page.set_extra_http_headers(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
)
}
)
page.set_default_timeout(60000)
try:
response = page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Profile page loaded (domcontentloaded) for %s", symbol)
if response:
response_html = response.text()
else:
app.logger.warning("No response body for profile page %s", symbol)
page.wait_for_timeout(1000)
rendered_html = page.content()
finally:
browser.close()
if not response_html and not rendered_html:
return {"error": "Profile page content missing", "stock": symbol, "url": url}
payload_source = response_html or rendered_html
payload_soup = BeautifulSoup(payload_source, "html.parser") if payload_source else None
payloads = extract_sveltekit_payloads_from_soup(payload_soup)
if not payloads and rendered_html and rendered_html != payload_source:
fallback_soup = BeautifulSoup(rendered_html, "html.parser")
payloads = extract_sveltekit_payloads_from_soup(fallback_soup)
if not payloads:
return {
"error": "No embedded payloads found on profile page",
"stock": symbol,
"url": url,
}
quote_summary_url, quote_summary_payload = select_payload(
payloads, "quoteSummary", symbol
)
quote_url, quote_payload = select_payload(payloads, "v7/finance/quote?", symbol)
quote_type_url, quote_type_payload = select_payload(
payloads, "/v1/finance/quoteType/", symbol
)
ratings_url, ratings_payload = select_payload(payloads, "ratings/top", symbol)
recs_url, recs_payload = select_payload(
payloads, "recommendationsbysymbol", symbol
)
quote_summary = extract_quote_summary(quote_summary_payload)
quote = extract_quote_response(quote_payload)
quote_type = extract_quote_type(quote_type_payload)
summary_detail = quote_summary.get("summaryDetail", {}) if quote_summary else {}
key_stats = quote_summary.get("defaultKeyStatistics", {}) if quote_summary else {}
financial_data = quote_summary.get("financialData", {}) if quote_summary else {}
price_data = quote_summary.get("price", {}) if quote_summary else {}
recommendation_trend = (
quote_summary.get("recommendationTrend", {}) if quote_summary else {}
)
upgrade_history = (
quote_summary.get("upgradeDowngradeHistory", {}) if quote_summary else {}
)
earnings = quote_summary.get("earnings", {}) if quote_summary else {}
calendar_events = quote_summary.get("calendarEvents", {}) if quote_summary else {}
equity_performance = (
quote_summary.get("equityPerformance", {}) if quote_summary else {}
)
performance_overview = (
quote_summary.get("quoteUnadjustedPerformanceOverview", {})
if quote_summary
else {}
)
key_metrics = build_profile_key_metrics(
summary_detail, key_stats, financial_data, price_data, quote
)
valuation = {
"market_cap": extract_raw_value(key_stats.get("marketCap")),
"enterprise_value": extract_raw_value(key_stats.get("enterpriseValue")),
"price_to_book": extract_raw_value(key_stats.get("priceToBook")),
"price_to_sales": extract_raw_value(key_stats.get("priceToSalesTrailing12Months")),
"trailing_pe": key_metrics.get("trailing_pe"),
"forward_pe": key_metrics.get("forward_pe"),
}
profitability = {
"profit_margins": extract_raw_value(financial_data.get("profitMargins")),
"operating_margins": extract_raw_value(financial_data.get("operatingMargins")),
"gross_margins": extract_raw_value(financial_data.get("grossMargins")),
"ebitda_margins": extract_raw_value(financial_data.get("ebitdaMargins")),
"return_on_assets": extract_raw_value(financial_data.get("returnOnAssets")),
"return_on_equity": extract_raw_value(financial_data.get("returnOnEquity")),
}
growth = {
"revenue_growth": extract_raw_value(financial_data.get("revenueGrowth")),
"earnings_growth": extract_raw_value(financial_data.get("earningsGrowth")),
"revenue_per_share": extract_raw_value(financial_data.get("revenuePerShare")),
}
financial_strength = {
"total_cash": extract_raw_value(financial_data.get("totalCash")),
"total_debt": extract_raw_value(financial_data.get("totalDebt")),
"debt_to_equity": extract_raw_value(financial_data.get("debtToEquity")),
"current_ratio": extract_raw_value(financial_data.get("currentRatio")),
"quick_ratio": extract_raw_value(financial_data.get("quickRatio")),
}
cashflow = {
"operating_cashflow": extract_raw_value(financial_data.get("operatingCashflow")),
"free_cashflow": extract_raw_value(financial_data.get("freeCashflow")),
"ebitda": extract_raw_value(financial_data.get("ebitda")),
}
ownership = {
"shares_outstanding": extract_raw_value(key_stats.get("sharesOutstanding")),
"float_shares": extract_raw_value(key_stats.get("floatShares")),
"shares_short": extract_raw_value(key_stats.get("sharesShort")),
"short_ratio": extract_raw_value(key_stats.get("shortRatio")),
"short_percent_of_float": extract_raw_value(key_stats.get("shortPercentOfFloat")),
"held_percent_insiders": extract_raw_value(key_stats.get("heldPercentInsiders")),
"held_percent_institutions": extract_raw_value(
key_stats.get("heldPercentInstitutions")
),
}
analyst = {
"recommendation": {
"key": key_metrics.get("recommendation_key"),
"mean": key_metrics.get("recommendation_mean"),
"analyst_opinion_count": key_metrics.get("analyst_opinion_count"),
"target_price_high": key_metrics.get("target_price_high"),
"target_price_low": key_metrics.get("target_price_low"),
"target_price_mean": key_metrics.get("target_price_mean"),
"target_price_median": key_metrics.get("target_price_median"),
},
"trend": simplify_recommendation_trend(recommendation_trend.get("trend")),
"upgrades_downgrades": simplify_upgrade_history(
upgrade_history.get("history"), limit=20
),
"ratings_top": simplify_ratings_top(ratings_payload),
}
earnings_summary = summarize_earnings(earnings, calendar_events)
performance_summary = {
"equity_performance": summarize_performance(equity_performance),
"unadjusted_performance": summarize_performance(performance_overview),
}
matched_symbols = []
for candidate in [
price_data.get("symbol") if price_data else None,
quote.get("symbol") if quote else None,
quote_type.get("symbol") if quote_type else None,
]:
if candidate:
matched_symbols.append(candidate)
symbol_match = None
if matched_symbols:
symbol_match = any(
candidate.upper() == symbol.upper() for candidate in matched_symbols
)
issues = []
if not quote_summary:
issues.append("missing_quote_summary")
if matched_symbols and not symbol_match:
issues.append("symbol_mismatch")
if not quote:
issues.append("missing_quote_data")
if not quote_type:
issues.append("missing_quote_type")
validation = {
"requested_symbol": symbol,
"matched_symbols": matched_symbols,
"symbol_match": symbol_match,
"issues": issues,
}
if "missing_quote_summary" in issues or "symbol_mismatch" in issues:
return {
"error": "Profile validation failed",
"stock": symbol,
"url": url,
"validation": validation,
"data_sources": {
"quote_summary": quote_summary_url,
"quote": quote_url,
"quote_type": quote_type_url,
"ratings_top": ratings_url,
"recommendations": recs_url,
},
}
return {
"stock": symbol,
"url": url,
"fetched_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"validation": validation,
"key_metrics": key_metrics,
"valuation": valuation,
"profitability": profitability,
"growth": growth,
"financial_strength": financial_strength,
"cashflow": cashflow,
"ownership": ownership,
"analyst": analyst,
"earnings": earnings_summary,
"performance": performance_summary,
"data_sources": {
"quote_summary": quote_summary_url,
"quote": quote_url,
"quote_type": quote_type_url,
"ratings_top": ratings_url,
"recommendations": recs_url,
},
}
def scrape_yahoo_options(symbol, expiration=None, strike_limit=25):
def parse_table(table_html, side): def parse_table(table_html, side):
if not table_html: if not table_html:
app.logger.warning("No %s table HTML for %s", side, symbol) app.logger.warning("No %s table HTML for %s", side, symbol)
@@ -386,7 +1063,12 @@ def scrape_yahoo_options(symbol, expiration=None):
fallback_to_base = False fallback_to_base = False
with sync_playwright() as p: with sync_playwright() as p:
browser = p.chromium.launch(headless=True) launch_args = chromium_launch_args()
if launch_args:
app.logger.info("GPU acceleration enabled")
else:
app.logger.info("GPU acceleration disabled")
browser = p.chromium.launch(headless=True, args=launch_args)
page = browser.new_page() page = browser.new_page()
page.set_extra_http_headers( page.set_extra_http_headers(
{ {
@@ -544,7 +1226,7 @@ def scrape_yahoo_options(symbol, expiration=None):
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
# Pruning logic # Pruning logic
# ---------------------------------------------------------------------- # ----------------------------------------------------------------------
def prune_nearest(options, price_value, limit=26, side=""): def prune_nearest(options, price_value, limit=25, side=""):
if price_value is None: if price_value is None:
return options, 0 return options, 0
@@ -558,8 +1240,18 @@ def scrape_yahoo_options(symbol, expiration=None):
pruned_count = len(options) - len(pruned) pruned_count = len(options) - len(pruned)
return pruned, pruned_count return pruned, pruned_count
calls, pruned_calls = prune_nearest(calls_full, price, side="calls") calls, pruned_calls = prune_nearest(
puts, pruned_puts = prune_nearest(puts_full, price, side="puts") calls_full,
price,
limit=strike_limit,
side="calls",
)
puts, pruned_puts = prune_nearest(
puts_full,
price,
limit=strike_limit,
side="puts",
)
def strike_range(opts): def strike_range(opts):
strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))] strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))]
@@ -587,18 +1279,191 @@ def scrape_yahoo_options(symbol, expiration=None):
@app.route("/scrape_sync") @app.route("/scrape_sync")
def scrape_sync(): def scrape_sync():
symbol = request.args.get("stock", "MSFT") symbol = request.args.get("stock")
if not symbol:
return jsonify({"error": "Missing 'stock' parameter"}), 400
expiration = ( expiration = (
request.args.get("expiration") request.args.get("expiration")
or request.args.get("expiry") or request.args.get("expiry")
or request.args.get("date") or request.args.get("date")
) )
strike_limit = parse_strike_limit(request.args.get("strikeLimit"), default=25)
app.logger.info( app.logger.info(
"Received /scrape_sync request for symbol=%s expiration=%s", "Received /scrape_sync request for symbol=%s expiration=%s strike_limit=%s",
symbol, symbol,
expiration, expiration,
strike_limit,
) )
return jsonify(scrape_yahoo_options(symbol, expiration)) return jsonify(scrape_yahoo_options(symbol, expiration, strike_limit))
@app.route("/profile")
def profile():
symbol = request.args.get("stock")
if not symbol:
return jsonify({"error": "Missing 'stock' parameter"}), 400
app.logger.info("Received /profile request for symbol=%s", symbol)
return jsonify(scrape_yahoo_profile(symbol))
def scrape_truths_sync(count=10, handle="realDonaldTrump"):
app.logger.info("Starting Truth Social scrape for handle=%s count=%d", handle, count)
with sync_playwright() as p:
launch_args = chromium_launch_args()
if launch_args:
app.logger.info("GPU acceleration enabled for Truth Social")
browser = p.chromium.launch(headless=True, args=launch_args)
context = browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
viewport={'width': 1920, 'height': 2000}
)
page = context.new_page()
try:
url = f"https://truthsocial.com/@{handle}"
app.logger.info("Navigating to %s", url)
page.goto(url, wait_until="domcontentloaded", timeout=60000)
# Wait for content to load
page.wait_for_timeout(5000)
# Scale down to fit more content
page.evaluate("document.body.style.zoom = '0.7'")
page.wait_for_timeout(2000)
# Handle potential modal/ad overlay
try:
close_btn = page.query_selector('[data-testid="close-modal"]')
if close_btn:
close_btn.click()
page.wait_for_timeout(1000)
page.keyboard.press("Escape")
except Exception:
pass
# Wait for any status to appear
selector = '[data-testid="status"]'
try:
page.wait_for_selector(selector, timeout=20000)
except Exception:
selector = '[data-id]'
page.wait_for_selector(selector, timeout=10000)
truths_data = []
seen_ids = set()
# Since virtual lists only render what is near the scroll position,
# we need a few small scrolls even with a tall viewport.
for scroll_step in range(10):
# Get all current statuses
statuses = page.query_selector_all(selector)
for status in statuses:
if len(truths_data) >= count:
break
try:
# Find post ID and validate it belongs to the target handle
links = status.query_selector_all('a')
post_id = None
for link in links:
href = link.get_attribute('href')
if href and f"/@{handle}/posts/" in href:
post_id = href
break
if not post_id or post_id in seen_ids:
continue
seen_ids.add(post_id)
# Content
content_el = status.query_selector('[data-testid="status-content"]')
if not content_el:
content_el = status.query_selector('[data-testid="markup"]')
content_text = content_el.inner_text() if content_el else ""
# Time
time_el = status.query_selector('time')
time_text = time_el.get_attribute('title') if time_el else ""
if not time_text and time_el:
time_text = time_el.inner_text()
# Counts
def get_btn_text(btn_selector):
btn = status.query_selector(btn_selector)
return btn.inner_text() if btn else "0"
reply_count = get_btn_text('button[aria-label="Reply"]')
retruth_count = get_btn_text('button[aria-label="ReTruth"]')
like_count = get_btn_text('button[aria-label="Like"]')
# Media
media_urls = []
imgs = status.query_selector_all('img')
for img in imgs:
alt = img.get_attribute('alt')
if alt in ["Avatar", "Profile header", "Logo", "Verified Account"]:
continue
src = img.get_attribute('src')
if src and ("static-assets" in src or "proxy" in src):
media_urls.append(src)
videos = status.query_selector_all('video')
for video in videos:
src = video.get_attribute('src')
if not src:
source_tag = video.query_selector('source')
if source_tag: src = source_tag.get_attribute('src')
if src: media_urls.append(src)
def clean(c):
return str(c).strip().replace('\n', '')
truths_data.append({
"id": post_id,
"content": content_text,
"time": time_text,
"likes_count": clean(like_count),
"comments_count": clean(reply_count),
"retruths_count": clean(retruth_count),
"media": list(set(media_urls))
})
except Exception:
continue
if len(truths_data) >= count:
break
# Scroll a bit to trigger next items
page.evaluate("window.scrollBy(0, 500)")
page.wait_for_timeout(1000)
app.logger.info("Scraped %d truths", len(truths_data))
return truths_data[:count]
except Exception as e:
app.logger.error("Truths scraper error: %s", e)
return {"error": str(e)}
finally:
browser.close()
@app.route("/truths")
def truths():
try:
count = int(request.args.get("count", 10))
except ValueError:
count = 10
handle = request.args.get("handle", "realDonaldTrump")
app.logger.info("Received /truths request for handle=%s count=%d", handle, count)
return jsonify(scrape_truths_sync(count, handle))
if __name__ == "__main__": if __name__ == "__main__":

199
scripts/test_cycles.py Normal file
View File

@@ -0,0 +1,199 @@
import argparse
import datetime
import json
import sys
import time
import urllib.parse
import urllib.request
DEFAULT_STOCKS = ["AAPL", "AMZN", "MSFT", "TSLA"]
DEFAULT_CYCLES = [None, 5, 10, 25, 50, 75, 100, 150, 200, 500]
def http_get(base_url, params, timeout):
query = urllib.parse.urlencode(params)
url = f"{base_url}?{query}"
with urllib.request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def expected_code_from_epoch(epoch):
return datetime.datetime.utcfromtimestamp(epoch).strftime("%y%m%d")
def all_contracts_match(opts, expected_code):
for opt in opts:
name = opt.get("Contract Name") or ""
if expected_code not in name:
return False
return True
def parse_list(value, default):
if not value:
return default
return [item.strip() for item in value.split(",") if item.strip()]
def parse_cycles(value):
if not value:
return DEFAULT_CYCLES
cycles = []
for item in value.split(","):
token = item.strip().lower()
if not token or token in ("default", "none"):
cycles.append(None)
continue
try:
cycles.append(int(token))
except ValueError:
raise ValueError(f"Invalid strikeLimit value: {item}")
return cycles
def main():
parser = argparse.ArgumentParser(description="Yahoo options scraper test cycles")
parser.add_argument(
"--base-url",
default="http://127.0.0.1:9777/scrape_sync",
help="Base URL for /scrape_sync",
)
parser.add_argument(
"--stocks",
default=",".join(DEFAULT_STOCKS),
help="Comma-separated stock symbols",
)
parser.add_argument(
"--strike-limits",
default="default,5,10,25,50,75,100,150,200,500",
help="Comma-separated strike limits (use 'default' for the API default)",
)
parser.add_argument(
"--baseline-limit",
type=int,
default=5000,
help="Large strikeLimit used to capture all available strikes",
)
parser.add_argument(
"--timeout",
type=int,
default=180,
help="Request timeout in seconds",
)
parser.add_argument(
"--sleep",
type=float,
default=0.2,
help="Sleep between requests",
)
args = parser.parse_args()
stocks = parse_list(args.stocks, DEFAULT_STOCKS)
cycles = parse_cycles(args.strike_limits)
print("Fetching expiration lists...")
expirations = {}
for stock in stocks:
data = http_get(args.base_url, {"stock": stock, "expiration": "invalid"}, args.timeout)
if "available_expirations" not in data:
print(f"ERROR: missing available_expirations for {stock}: {data}")
sys.exit(1)
values = [opt.get("value") for opt in data["available_expirations"] if opt.get("value")]
if len(values) < 4:
print(f"ERROR: not enough expirations for {stock}: {values}")
sys.exit(1)
expirations[stock] = values[:4]
print(f" {stock}: {expirations[stock]}")
time.sleep(args.sleep)
print("\nBuilding baseline counts (strikeLimit=%d)..." % args.baseline_limit)
baseline_counts = {}
for stock, exp_list in expirations.items():
for exp in exp_list:
data = http_get(
args.base_url,
{"stock": stock, "expiration": exp, "strikeLimit": args.baseline_limit},
args.timeout,
)
if "error" in data:
print(f"ERROR: baseline error for {stock} {exp}: {data}")
sys.exit(1)
calls_count = data.get("total_calls")
puts_count = data.get("total_puts")
if calls_count is None or puts_count is None:
print(f"ERROR: baseline missing counts for {stock} {exp}: {data}")
sys.exit(1)
expected_code = expected_code_from_epoch(exp)
if not all_contracts_match(data.get("calls", []), expected_code):
print(f"ERROR: baseline calls mismatch for {stock} {exp}")
sys.exit(1)
if not all_contracts_match(data.get("puts", []), expected_code):
print(f"ERROR: baseline puts mismatch for {stock} {exp}")
sys.exit(1)
baseline_counts[(stock, exp)] = (calls_count, puts_count)
print(f" {stock} {exp}: calls={calls_count} puts={puts_count}")
time.sleep(args.sleep)
print("\nRunning %d cycles of API tests..." % len(cycles))
for idx, strike_limit in enumerate(cycles, start=1):
print(f"Cycle {idx}/{len(cycles)} (strikeLimit={strike_limit})")
for stock, exp_list in expirations.items():
for exp in exp_list:
params = {"stock": stock, "expiration": exp}
if strike_limit is not None:
params["strikeLimit"] = strike_limit
data = http_get(args.base_url, params, args.timeout)
if "error" in data:
print(f"ERROR: {stock} {exp} -> {data}")
sys.exit(1)
selected_val = data.get("selected_expiration", {}).get("value")
if selected_val != exp:
print(
f"ERROR: selected expiration mismatch for {stock} {exp}: {selected_val}"
)
sys.exit(1)
expected_code = expected_code_from_epoch(exp)
if not all_contracts_match(data.get("calls", []), expected_code):
print(f"ERROR: calls expiry mismatch for {stock} {exp}")
sys.exit(1)
if not all_contracts_match(data.get("puts", []), expected_code):
print(f"ERROR: puts expiry mismatch for {stock} {exp}")
sys.exit(1)
available_calls, available_puts = baseline_counts[(stock, exp)]
expected_limit = strike_limit if strike_limit is not None else 25
expected_calls = min(expected_limit, available_calls)
expected_puts = min(expected_limit, available_puts)
if data.get("total_calls") != expected_calls:
print(
f"ERROR: call count mismatch for {stock} {exp}: "
f"got {data.get('total_calls')} expected {expected_calls}"
)
sys.exit(1)
if data.get("total_puts") != expected_puts:
print(
f"ERROR: put count mismatch for {stock} {exp}: "
f"got {data.get('total_puts')} expected {expected_puts}"
)
sys.exit(1)
expected_pruned_calls = max(0, available_calls - expected_calls)
expected_pruned_puts = max(0, available_puts - expected_puts)
if data.get("pruned_calls_count") != expected_pruned_calls:
print(
f"ERROR: pruned calls mismatch for {stock} {exp}: "
f"got {data.get('pruned_calls_count')} expected {expected_pruned_calls}"
)
sys.exit(1)
if data.get("pruned_puts_count") != expected_pruned_puts:
print(
f"ERROR: pruned puts mismatch for {stock} {exp}: "
f"got {data.get('pruned_puts_count')} expected {expected_pruned_puts}"
)
sys.exit(1)
time.sleep(args.sleep)
print(f"Cycle {idx} OK")
print("\nAll cycles completed successfully.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,145 @@
import argparse
import json
import sys
import time
import urllib.parse
import urllib.request
DEFAULT_SYMBOLS = ["AAPL", "AMZN", "MSFT", "TSLA"]
REQUIRED_SECTIONS = [
"key_metrics",
"valuation",
"profitability",
"growth",
"financial_strength",
"cashflow",
"ownership",
"analyst",
"earnings",
"performance",
]
REQUIRED_KEY_METRICS = [
"previous_close",
"open",
"bid",
"ask",
"beta",
"eps_trailing",
"dividend_rate",
"current_price",
]
def http_get(base_url, params, timeout):
query = urllib.parse.urlencode(params)
url = f"{base_url}?{query}"
with urllib.request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def parse_list(value, default):
if not value:
return default
return [item.strip() for item in value.split(",") if item.strip()]
def build_signature(data):
return {
"key_metrics_keys": sorted(data.get("key_metrics", {}).keys()),
"valuation_keys": sorted(data.get("valuation", {}).keys()),
"profitability_keys": sorted(data.get("profitability", {}).keys()),
"growth_keys": sorted(data.get("growth", {}).keys()),
"financial_strength_keys": sorted(data.get("financial_strength", {}).keys()),
"cashflow_keys": sorted(data.get("cashflow", {}).keys()),
"ownership_keys": sorted(data.get("ownership", {}).keys()),
"analyst_keys": sorted(data.get("analyst", {}).keys()),
"earnings_keys": sorted(data.get("earnings", {}).keys()),
"performance_keys": sorted(data.get("performance", {}).keys()),
}
def validate_payload(symbol, data):
if "error" in data:
return f"API error for {symbol}: {data}"
if data.get("stock", "").upper() != symbol.upper():
return f"Symbol mismatch: expected {symbol} got {data.get('stock')}"
validation = data.get("validation", {})
if validation.get("symbol_match") is not True:
return f"Validation symbol_match failed for {symbol}: {validation}"
if validation.get("issues"):
return f"Validation issues for {symbol}: {validation}"
for section in REQUIRED_SECTIONS:
if section not in data:
return f"Missing section {section} for {symbol}"
key_metrics = data.get("key_metrics", {})
for field in REQUIRED_KEY_METRICS:
if field not in key_metrics:
return f"Missing key metric {field} for {symbol}"
return None
def main():
parser = argparse.ArgumentParser(description="Yahoo profile scraper test cycles")
parser.add_argument(
"--base-url",
default="http://127.0.0.1:9777/profile",
help="Base URL for /profile",
)
parser.add_argument(
"--symbols",
default=",".join(DEFAULT_SYMBOLS),
help="Comma-separated stock symbols",
)
parser.add_argument(
"--runs",
type=int,
default=8,
help="Number of validation runs per symbol",
)
parser.add_argument(
"--timeout",
type=int,
default=180,
help="Request timeout in seconds",
)
parser.add_argument(
"--sleep",
type=float,
default=0.2,
help="Sleep between requests",
)
args = parser.parse_args()
symbols = parse_list(args.symbols, DEFAULT_SYMBOLS)
signatures = {}
print(f"Running {args.runs} profile cycles for: {', '.join(symbols)}")
for run in range(1, args.runs + 1):
print(f"Cycle {run}/{args.runs}")
for symbol in symbols:
data = http_get(args.base_url, {"stock": symbol}, args.timeout)
error = validate_payload(symbol, data)
if error:
print(f"ERROR: {error}")
sys.exit(1)
signature = build_signature(data)
if symbol not in signatures:
signatures[symbol] = signature
elif signatures[symbol] != signature:
print(f"ERROR: Signature changed for {symbol}")
print(f"Baseline: {signatures[symbol]}")
print(f"Current: {signature}")
sys.exit(1)
time.sleep(args.sleep)
print(f"Cycle {run} OK")
print("\nAll profile cycles completed successfully.")
if __name__ == "__main__":
main()