Files
SimpleScraper/scraper_service.py

1313 lines
44 KiB
Python

from flask import Flask, jsonify, request
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
from datetime import datetime, timezone
import html
import urllib.parse
import logging
import json
import re
import time
import os
app = Flask(__name__)
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
app.logger.setLevel(logging.INFO)
DATE_FORMATS = (
"%Y-%m-%d",
"%Y/%m/%d",
"%Y%m%d",
"%b %d, %Y",
"%B %d, %Y",
)
GPU_ACCEL_ENV = "ENABLE_GPU"
def parse_env_flag(value, default=False):
if value is None:
return default
return str(value).strip().lower() in ("1", "true", "yes", "on")
def detect_gpu_available():
env_value = os.getenv(GPU_ACCEL_ENV)
if env_value is not None:
return parse_env_flag(env_value, default=False)
nvidia_visible = os.getenv("NVIDIA_VISIBLE_DEVICES")
if nvidia_visible and nvidia_visible.lower() not in ("none", "void", "off"):
return True
if os.path.exists("/dev/nvidia0"):
return True
if os.path.exists("/dev/dri/renderD128") or os.path.exists("/dev/dri/card0"):
return True
return False
def chromium_launch_args():
if not detect_gpu_available():
return []
if os.name == "nt":
return ["--enable-gpu"]
return [
"--enable-gpu",
"--ignore-gpu-blocklist",
"--disable-software-rasterizer",
"--use-gl=egl",
"--enable-zero-copy",
"--enable-gpu-rasterization",
]
def parse_date(value):
for fmt in DATE_FORMATS:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
return None
def normalize_label(value):
return " ".join(value.strip().split()).lower()
def format_expiration_label(timestamp):
try:
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
except Exception:
return str(timestamp)
def format_percent(value):
if value is None:
return None
try:
return f"{value * 100:.2f}%"
except Exception:
return None
def extract_raw_value(value):
if isinstance(value, dict):
return value.get("raw")
return value
def extract_value(value):
if isinstance(value, dict):
if value.get("raw") is not None:
return value.get("raw")
return value.get("fmt")
return value
def extract_fmt_value(value):
if isinstance(value, dict):
return value.get("fmt")
return None
def format_percent_value(value):
fmt = extract_fmt_value(value)
if fmt is not None:
return fmt
return format_percent(extract_raw_value(value))
def format_last_trade_date(timestamp):
timestamp = extract_raw_value(timestamp)
if not timestamp:
return None
try:
return datetime.fromtimestamp(timestamp).strftime("%m/%d/%Y %I:%M %p") + " EST"
except Exception:
return None
def extract_option_chain_from_html(html):
if not html:
return None
token = "\"body\":\""
start = 0
while True:
idx = html.find(token, start)
if idx == -1:
break
i = idx + len(token)
escaped = False
raw_chars = []
while i < len(html):
ch = html[i]
if escaped:
raw_chars.append(ch)
escaped = False
else:
if ch == "\\":
raw_chars.append(ch)
escaped = True
elif ch == "\"":
break
else:
raw_chars.append(ch)
i += 1
raw = "".join(raw_chars)
try:
body_text = json.loads(f"\"{raw}\"")
except json.JSONDecodeError:
start = idx + len(token)
continue
if "optionChain" not in body_text:
start = idx + len(token)
continue
try:
payload = json.loads(body_text)
except json.JSONDecodeError:
start = idx + len(token)
continue
option_chain = payload.get("optionChain")
if option_chain and option_chain.get("result"):
return option_chain
start = idx + len(token)
return None
def extract_expiration_dates_from_chain(chain):
if not chain:
return []
result = chain.get("result", [])
if not result:
return []
return result[0].get("expirationDates", []) or []
def normalize_chain_rows(rows):
normalized = []
for row in rows or []:
normalized.append(
{
"Contract Name": row.get("contractSymbol"),
"Last Trade Date (EST)": format_last_trade_date(
row.get("lastTradeDate")
),
"Strike": extract_raw_value(row.get("strike")),
"Last Price": extract_raw_value(row.get("lastPrice")),
"Bid": extract_raw_value(row.get("bid")),
"Ask": extract_raw_value(row.get("ask")),
"Change": extract_raw_value(row.get("change")),
"% Change": format_percent_value(row.get("percentChange")),
"Volume": extract_raw_value(row.get("volume")),
"Open Interest": extract_raw_value(row.get("openInterest")),
"Implied Volatility": format_percent_value(
row.get("impliedVolatility")
),
}
)
return normalized
def build_rows_from_chain(chain):
result = chain.get("result", []) if chain else []
if not result:
return [], []
options = result[0].get("options", [])
if not options:
return [], []
option = options[0]
return (
normalize_chain_rows(option.get("calls")),
normalize_chain_rows(option.get("puts")),
)
def extract_contract_expiry_code(contract_name):
if not contract_name:
return None
match = re.search(r"(\d{6})", contract_name)
return match.group(1) if match else None
def expected_expiry_code(timestamp):
if not timestamp:
return None
try:
return datetime.utcfromtimestamp(timestamp).strftime("%y%m%d")
except Exception:
return None
def extract_expiration_dates_from_html(html):
if not html:
return []
patterns = (
r'\\"expirationDates\\":\[(.*?)\]',
r'"expirationDates":\[(.*?)\]',
)
match = None
for pattern in patterns:
match = re.search(pattern, html, re.DOTALL)
if match:
break
if not match:
return []
raw = match.group(1)
values = []
for part in raw.split(","):
part = part.strip()
if part.isdigit():
try:
values.append(int(part))
except Exception:
continue
return values
def build_expiration_options(expiration_dates):
options = []
for value in expiration_dates or []:
try:
value_int = int(value)
except Exception:
continue
label = format_expiration_label(value_int)
try:
date_value = datetime.utcfromtimestamp(value_int).date()
except Exception:
date_value = None
options.append({"value": value_int, "label": label, "date": date_value})
return sorted(options, key=lambda x: x["value"])
def resolve_expiration(expiration, options):
if not expiration:
return None, None
raw = expiration.strip()
if not raw:
return None, None
if raw.isdigit():
value = int(raw)
if options:
for opt in options:
if opt.get("value") == value:
return value, opt.get("label")
return None, None
return value, format_expiration_label(value)
requested_date = parse_date(raw)
if requested_date:
for opt in options:
if opt.get("date") == requested_date:
return opt.get("value"), opt.get("label")
return None, None
normalized = normalize_label(raw)
for opt in options:
if normalize_label(opt.get("label", "")) == normalized:
return opt.get("value"), opt.get("label")
return None, None
def wait_for_tables(page):
try:
page.wait_for_selector(
"section[data-testid='options-list-table'] table",
timeout=30000,
)
except Exception:
page.wait_for_selector("table", timeout=30000)
for _ in range(30): # 30 * 1s = 30 seconds
tables = page.query_selector_all(
"section[data-testid='options-list-table'] table"
)
if len(tables) >= 2:
return tables
tables = page.query_selector_all("table")
if len(tables) >= 2:
return tables
time.sleep(1)
return []
def parse_strike_limit(value, default=25):
if value is None:
return default
try:
limit = int(value)
except (TypeError, ValueError):
return default
return limit if limit > 0 else default
def parse_sveltekit_payload(raw_text):
if not raw_text:
return None
try:
outer = json.loads(raw_text)
except json.JSONDecodeError:
return None
body = outer.get("body")
if isinstance(body, str):
try:
body = json.loads(body)
except json.JSONDecodeError:
pass
return {
"status": outer.get("status"),
"statusText": outer.get("statusText"),
"body": body,
}
def extract_sveltekit_payloads_from_soup(soup):
payloads = {}
if soup is None:
return payloads
scripts = soup.select('script[type="application/json"][data-sveltekit-fetched]')
for script in scripts:
url = script.get("data-url")
if not url:
continue
url = html.unescape(url)
raw_text = script.string or script.get_text()
payload = parse_sveltekit_payload(raw_text)
if not payload:
continue
payloads[url] = payload
return payloads
def select_payload(payloads, needle, symbol=None):
if not payloads:
return None, None
needle = needle.lower()
symbol_token = symbol.lower() if symbol else None
fallback = None
for url, payload in payloads.items():
url_lower = url.lower()
if needle not in url_lower:
continue
if symbol_token:
if f"/{symbol_token}" in url_lower or f"symbols={symbol_token}" in url_lower:
return url, payload.get("body")
if fallback is None:
fallback = (url, payload.get("body"))
return fallback if fallback else (None, None)
def extract_quote_summary(payload):
if not payload:
return None
summary = payload.get("quoteSummary")
if not summary:
return None
result = summary.get("result") or []
return result[0] if result else None
def extract_quote_response(payload):
if not payload:
return None
response = payload.get("quoteResponse")
if not response:
return None
result = response.get("result") or []
return result[0] if result else None
def extract_quote_type(payload):
if not payload:
return None
quote_type = payload.get("quoteType")
if not quote_type:
return None
result = quote_type.get("result") or []
return result[0] if result else None
def extract_recent_news_from_soup(soup, limit=20):
items = []
if soup is None:
return items
container = soup.select_one('[data-testid="recent-news"]')
root = container if container else soup
seen = set()
for item in root.select('[data-testid="storyitem"]'):
title_el = item.select_one("h3")
link_el = item.select_one("a[href]")
if not title_el and not link_el:
continue
title = title_el.get_text(strip=True) if title_el else None
link = link_el.get("href") if link_el else None
publisher = None
published = None
publishing = item.select_one(".publishing")
if publishing:
text = " ".join(publishing.stripped_strings)
if "\u2022" in text:
parts = [part.strip() for part in text.split("\u2022", 1)]
publisher = parts[0] or None
published = parts[1] if len(parts) > 1 else None
else:
publisher = text or None
key = link or title
if key and key in seen:
continue
if key:
seen.add(key)
items.append(
{
"title": title,
"publisher": publisher,
"published": published,
"link": link,
}
)
if limit and len(items) >= limit:
break
return items
def extract_news_summary_from_soup(soup):
if soup is None:
return None
summary = soup.select_one('[data-testid="ticker-news-summary"]')
if not summary:
return None
text = " ".join(summary.stripped_strings)
return text if text else None
def build_profile_key_metrics(summary_detail, key_stats, financial_data, price_data, quote):
summary_detail = summary_detail or {}
key_stats = key_stats or {}
financial_data = financial_data or {}
price_data = price_data or {}
quote = quote or {}
def pick_value(*values):
for value in values:
if value is not None:
return value
return None
return {
"previous_close": extract_raw_value(summary_detail.get("previousClose")),
"open": extract_raw_value(summary_detail.get("open")),
"bid": extract_raw_value(summary_detail.get("bid")),
"ask": extract_raw_value(summary_detail.get("ask")),
"bid_size": extract_raw_value(summary_detail.get("bidSize")),
"ask_size": extract_raw_value(summary_detail.get("askSize")),
"day_low": extract_raw_value(summary_detail.get("dayLow")),
"day_high": extract_raw_value(summary_detail.get("dayHigh")),
"fifty_two_week_low": extract_raw_value(quote.get("fiftyTwoWeekLow")),
"fifty_two_week_high": extract_raw_value(quote.get("fiftyTwoWeekHigh")),
"volume": pick_value(
extract_raw_value(summary_detail.get("volume")),
extract_raw_value(price_data.get("regularMarketVolume")),
extract_raw_value(quote.get("regularMarketVolume")),
),
"average_volume": pick_value(
extract_raw_value(summary_detail.get("averageVolume")),
extract_raw_value(price_data.get("averageDailyVolume3Month")),
),
"market_cap": pick_value(
extract_raw_value(summary_detail.get("marketCap")),
extract_raw_value(quote.get("marketCap")),
),
"beta": pick_value(
extract_raw_value(summary_detail.get("beta")),
extract_raw_value(key_stats.get("beta")),
),
"trailing_pe": pick_value(
extract_raw_value(summary_detail.get("trailingPE")),
extract_raw_value(key_stats.get("trailingPE")),
),
"forward_pe": pick_value(
extract_raw_value(summary_detail.get("forwardPE")),
extract_raw_value(key_stats.get("forwardPE")),
),
"eps_trailing": extract_raw_value(key_stats.get("trailingEps")),
"eps_forward": extract_raw_value(key_stats.get("forwardEps")),
"dividend_rate": extract_raw_value(summary_detail.get("dividendRate")),
"dividend_yield": extract_raw_value(summary_detail.get("dividendYield")),
"ex_dividend_date": extract_raw_value(summary_detail.get("exDividendDate")),
"payout_ratio": extract_raw_value(summary_detail.get("payoutRatio")),
"implied_volatility": extract_raw_value(summary_detail.get("impliedVolatility")),
"current_price": pick_value(
extract_raw_value(price_data.get("regularMarketPrice")),
extract_raw_value(financial_data.get("currentPrice")),
extract_raw_value(quote.get("regularMarketPrice")),
),
"recommendation_key": financial_data.get("recommendationKey"),
"recommendation_mean": extract_raw_value(financial_data.get("recommendationMean")),
"target_price_high": extract_raw_value(financial_data.get("targetHighPrice")),
"target_price_low": extract_raw_value(financial_data.get("targetLowPrice")),
"target_price_mean": extract_raw_value(financial_data.get("targetMeanPrice")),
"target_price_median": extract_raw_value(financial_data.get("targetMedianPrice")),
"analyst_opinion_count": extract_raw_value(
financial_data.get("numberOfAnalystOpinions")
),
}
def simplify_recommendation_trend(trend):
simplified = []
for entry in trend or []:
simplified.append(
{
"period": entry.get("period"),
"strong_buy": entry.get("strongBuy"),
"buy": entry.get("buy"),
"hold": entry.get("hold"),
"sell": entry.get("sell"),
"strong_sell": entry.get("strongSell"),
}
)
return simplified
def simplify_upgrade_history(history, limit=20):
simplified = []
for entry in history or []:
simplified.append(
{
"firm": entry.get("firm"),
"action": entry.get("action"),
"from_grade": entry.get("fromGrade"),
"to_grade": entry.get("toGrade"),
"date": entry.get("epochGradeDate") or entry.get("gradeDate"),
}
)
if limit and len(simplified) >= limit:
break
return simplified
def simplify_ratings_top(payload):
if not payload:
return None
simplified = {}
for key, value in payload.items():
if not isinstance(value, dict):
continue
simplified[key] = {
"analyst": value.get("analyst"),
"rating_current": value.get("rating_current"),
"rating_sentiment": value.get("rating_sentiment"),
"pt_current": value.get("pt_current"),
"adjusted_pt_current": value.get("adjusted_pt_current"),
"announcement_date": value.get("announcement_date"),
"datapoints": value.get("datapoints"),
"scores": {
"dir": extract_value(value.get("dir")),
"mm": extract_value(value.get("mm")),
"pt": extract_value(value.get("pt")),
"fin_score": extract_value(value.get("fin_score")),
},
}
return simplified or None
def summarize_performance(perf_data):
if not perf_data:
return {}
overview = perf_data.get("performanceOverview")
if isinstance(overview, dict):
return {
"as_of_date": extract_value(overview.get("asOfDate")),
"returns": {
"five_day": extract_value(overview.get("fiveDaysReturn")),
"one_month": extract_value(overview.get("oneMonthReturn")),
"three_month": extract_value(overview.get("threeMonthReturn")),
"six_month": extract_value(overview.get("sixMonthReturn")),
"ytd": extract_value(overview.get("ytdReturnPct")),
"one_year": extract_value(overview.get("oneYearTotalReturn")),
"two_year": extract_value(overview.get("twoYearTotalReturn")),
"three_year": extract_value(overview.get("threeYearTotalReturn")),
"five_year": extract_value(overview.get("fiveYearTotalReturn")),
"ten_year": extract_value(overview.get("tenYearTotalReturn")),
"max": extract_value(overview.get("maxReturn")),
},
}
summary = []
for entry in overview or []:
if not isinstance(entry, dict):
continue
summary.append(
{
"period": entry.get("period"),
"performance": extract_value(entry.get("performance")),
"benchmark": extract_value(entry.get("benchmark")),
}
)
return {"periods": summary} if summary else {}
def summarize_earnings(earnings, calendar_events):
earnings = earnings or {}
calendar_events = calendar_events or {}
earnings_chart = earnings.get("earningsChart", {}) or {}
financials_chart = earnings.get("financialsChart", {}) or {}
calendar_earnings = calendar_events.get("earnings", {}) or {}
quarterly = []
for entry in earnings_chart.get("quarterly") or []:
quarterly.append(
{
"quarter": entry.get("date"),
"actual": extract_value(entry.get("actual")),
"estimate": extract_value(entry.get("estimate")),
"surprise": extract_value(entry.get("difference")),
"surprise_percent": extract_value(entry.get("surprisePct")),
}
)
yearly = []
for entry in financials_chart.get("yearly") or []:
yearly.append(
{
"year": entry.get("date"),
"revenue": extract_value(entry.get("revenue")),
"earnings": extract_value(entry.get("earnings")),
}
)
quarterly_financials = []
for entry in financials_chart.get("quarterly") or []:
quarterly_financials.append(
{
"quarter": entry.get("date"),
"revenue": extract_value(entry.get("revenue")),
"earnings": extract_value(entry.get("earnings")),
}
)
return {
"next_earnings_dates": [
extract_value(value) for value in calendar_earnings.get("earningsDate", []) or []
],
"is_earnings_date_estimate": calendar_earnings.get("isEarningsDateEstimate"),
"earnings_estimates": {
"average": extract_value(calendar_earnings.get("earningsAverage")),
"low": extract_value(calendar_earnings.get("earningsLow")),
"high": extract_value(calendar_earnings.get("earningsHigh")),
},
"revenue_estimates": {
"average": extract_value(calendar_earnings.get("revenueAverage")),
"low": extract_value(calendar_earnings.get("revenueLow")),
"high": extract_value(calendar_earnings.get("revenueHigh")),
},
"quarterly_earnings": quarterly[:4],
"yearly_financials": yearly[:4],
"quarterly_financials": quarterly_financials[:4],
"current_quarter_estimate": extract_value(
earnings_chart.get("currentQuarterEstimate")
),
"current_quarter_estimate_date": earnings_chart.get("currentQuarterEstimateDate"),
"current_calendar_quarter": earnings_chart.get("currentCalendarQuarter"),
"current_fiscal_quarter": earnings_chart.get("currentFiscalQuarter"),
}
def scrape_yahoo_profile(symbol):
encoded = urllib.parse.quote(symbol, safe="")
url = f"https://finance.yahoo.com/quote/{encoded}/"
app.logger.info("Starting profile scrape for symbol=%s url=%s", symbol, url)
response_html = None
rendered_html = None
payloads = {}
with sync_playwright() as p:
launch_args = chromium_launch_args()
if launch_args:
app.logger.info("GPU acceleration enabled")
else:
app.logger.info("GPU acceleration disabled")
browser = p.chromium.launch(headless=True, args=launch_args)
page = browser.new_page()
page.set_extra_http_headers(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
)
}
)
page.set_default_timeout(60000)
try:
response = page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Profile page loaded (domcontentloaded) for %s", symbol)
if response:
response_html = response.text()
else:
app.logger.warning("No response body for profile page %s", symbol)
page.wait_for_timeout(1000)
rendered_html = page.content()
finally:
browser.close()
if not response_html and not rendered_html:
return {"error": "Profile page content missing", "stock": symbol, "url": url}
payload_source = response_html or rendered_html
payload_soup = BeautifulSoup(payload_source, "html.parser") if payload_source else None
payloads = extract_sveltekit_payloads_from_soup(payload_soup)
if not payloads and rendered_html and rendered_html != payload_source:
fallback_soup = BeautifulSoup(rendered_html, "html.parser")
payloads = extract_sveltekit_payloads_from_soup(fallback_soup)
if not payloads:
return {
"error": "No embedded payloads found on profile page",
"stock": symbol,
"url": url,
}
quote_summary_url, quote_summary_payload = select_payload(
payloads, "quoteSummary", symbol
)
quote_url, quote_payload = select_payload(payloads, "v7/finance/quote?", symbol)
quote_type_url, quote_type_payload = select_payload(
payloads, "/v1/finance/quoteType/", symbol
)
ratings_url, ratings_payload = select_payload(payloads, "ratings/top", symbol)
recs_url, recs_payload = select_payload(
payloads, "recommendationsbysymbol", symbol
)
quote_summary = extract_quote_summary(quote_summary_payload)
quote = extract_quote_response(quote_payload)
quote_type = extract_quote_type(quote_type_payload)
summary_detail = quote_summary.get("summaryDetail", {}) if quote_summary else {}
key_stats = quote_summary.get("defaultKeyStatistics", {}) if quote_summary else {}
financial_data = quote_summary.get("financialData", {}) if quote_summary else {}
price_data = quote_summary.get("price", {}) if quote_summary else {}
recommendation_trend = (
quote_summary.get("recommendationTrend", {}) if quote_summary else {}
)
upgrade_history = (
quote_summary.get("upgradeDowngradeHistory", {}) if quote_summary else {}
)
earnings = quote_summary.get("earnings", {}) if quote_summary else {}
calendar_events = quote_summary.get("calendarEvents", {}) if quote_summary else {}
equity_performance = (
quote_summary.get("equityPerformance", {}) if quote_summary else {}
)
performance_overview = (
quote_summary.get("quoteUnadjustedPerformanceOverview", {})
if quote_summary
else {}
)
key_metrics = build_profile_key_metrics(
summary_detail, key_stats, financial_data, price_data, quote
)
valuation = {
"market_cap": extract_raw_value(key_stats.get("marketCap")),
"enterprise_value": extract_raw_value(key_stats.get("enterpriseValue")),
"price_to_book": extract_raw_value(key_stats.get("priceToBook")),
"price_to_sales": extract_raw_value(key_stats.get("priceToSalesTrailing12Months")),
"trailing_pe": key_metrics.get("trailing_pe"),
"forward_pe": key_metrics.get("forward_pe"),
}
profitability = {
"profit_margins": extract_raw_value(financial_data.get("profitMargins")),
"operating_margins": extract_raw_value(financial_data.get("operatingMargins")),
"gross_margins": extract_raw_value(financial_data.get("grossMargins")),
"ebitda_margins": extract_raw_value(financial_data.get("ebitdaMargins")),
"return_on_assets": extract_raw_value(financial_data.get("returnOnAssets")),
"return_on_equity": extract_raw_value(financial_data.get("returnOnEquity")),
}
growth = {
"revenue_growth": extract_raw_value(financial_data.get("revenueGrowth")),
"earnings_growth": extract_raw_value(financial_data.get("earningsGrowth")),
"revenue_per_share": extract_raw_value(financial_data.get("revenuePerShare")),
}
financial_strength = {
"total_cash": extract_raw_value(financial_data.get("totalCash")),
"total_debt": extract_raw_value(financial_data.get("totalDebt")),
"debt_to_equity": extract_raw_value(financial_data.get("debtToEquity")),
"current_ratio": extract_raw_value(financial_data.get("currentRatio")),
"quick_ratio": extract_raw_value(financial_data.get("quickRatio")),
}
cashflow = {
"operating_cashflow": extract_raw_value(financial_data.get("operatingCashflow")),
"free_cashflow": extract_raw_value(financial_data.get("freeCashflow")),
"ebitda": extract_raw_value(financial_data.get("ebitda")),
}
ownership = {
"shares_outstanding": extract_raw_value(key_stats.get("sharesOutstanding")),
"float_shares": extract_raw_value(key_stats.get("floatShares")),
"shares_short": extract_raw_value(key_stats.get("sharesShort")),
"short_ratio": extract_raw_value(key_stats.get("shortRatio")),
"short_percent_of_float": extract_raw_value(key_stats.get("shortPercentOfFloat")),
"held_percent_insiders": extract_raw_value(key_stats.get("heldPercentInsiders")),
"held_percent_institutions": extract_raw_value(
key_stats.get("heldPercentInstitutions")
),
}
analyst = {
"recommendation": {
"key": key_metrics.get("recommendation_key"),
"mean": key_metrics.get("recommendation_mean"),
"analyst_opinion_count": key_metrics.get("analyst_opinion_count"),
"target_price_high": key_metrics.get("target_price_high"),
"target_price_low": key_metrics.get("target_price_low"),
"target_price_mean": key_metrics.get("target_price_mean"),
"target_price_median": key_metrics.get("target_price_median"),
},
"trend": simplify_recommendation_trend(recommendation_trend.get("trend")),
"upgrades_downgrades": simplify_upgrade_history(
upgrade_history.get("history"), limit=20
),
"ratings_top": simplify_ratings_top(ratings_payload),
}
earnings_summary = summarize_earnings(earnings, calendar_events)
performance_summary = {
"equity_performance": summarize_performance(equity_performance),
"unadjusted_performance": summarize_performance(performance_overview),
}
matched_symbols = []
for candidate in [
price_data.get("symbol") if price_data else None,
quote.get("symbol") if quote else None,
quote_type.get("symbol") if quote_type else None,
]:
if candidate:
matched_symbols.append(candidate)
symbol_match = None
if matched_symbols:
symbol_match = any(
candidate.upper() == symbol.upper() for candidate in matched_symbols
)
issues = []
if not quote_summary:
issues.append("missing_quote_summary")
if matched_symbols and not symbol_match:
issues.append("symbol_mismatch")
if not quote:
issues.append("missing_quote_data")
if not quote_type:
issues.append("missing_quote_type")
validation = {
"requested_symbol": symbol,
"matched_symbols": matched_symbols,
"symbol_match": symbol_match,
"issues": issues,
}
if "missing_quote_summary" in issues or "symbol_mismatch" in issues:
return {
"error": "Profile validation failed",
"stock": symbol,
"url": url,
"validation": validation,
"data_sources": {
"quote_summary": quote_summary_url,
"quote": quote_url,
"quote_type": quote_type_url,
"ratings_top": ratings_url,
"recommendations": recs_url,
},
}
return {
"stock": symbol,
"url": url,
"fetched_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"),
"validation": validation,
"key_metrics": key_metrics,
"valuation": valuation,
"profitability": profitability,
"growth": growth,
"financial_strength": financial_strength,
"cashflow": cashflow,
"ownership": ownership,
"analyst": analyst,
"earnings": earnings_summary,
"performance": performance_summary,
"data_sources": {
"quote_summary": quote_summary_url,
"quote": quote_url,
"quote_type": quote_type_url,
"ratings_top": ratings_url,
"recommendations": recs_url,
},
}
def scrape_yahoo_options(symbol, expiration=None, strike_limit=25):
def parse_table(table_html, side):
if not table_html:
app.logger.warning("No %s table HTML for %s", side, symbol)
return []
soup = BeautifulSoup(table_html, "html.parser")
headers = [th.get_text(strip=True) for th in soup.select("thead th")]
rows = soup.select("tbody tr")
parsed = []
for r in rows:
tds = r.find_all("td")
if len(tds) != len(headers):
continue
item = {}
for i, c in enumerate(tds):
key = headers[i]
val = c.get_text(" ", strip=True)
# Convert numeric fields
if key in ["Strike", "Last Price", "Bid", "Ask", "Change"]:
try:
val = float(val.replace(",", ""))
except Exception:
val = None
elif key in ["Volume", "Open Interest"]:
try:
val = int(val.replace(",", ""))
except Exception:
val = None
elif val in ["-", ""]:
val = None
item[key] = val
parsed.append(item)
app.logger.info("Parsed %d %s rows", len(parsed), side)
return parsed
def read_option_chain(page):
html = page.content()
option_chain = extract_option_chain_from_html(html)
if option_chain:
expiration_dates = extract_expiration_dates_from_chain(option_chain)
else:
expiration_dates = extract_expiration_dates_from_html(html)
return option_chain, expiration_dates
def has_expected_expiry(options, expected_code):
if not expected_code:
return False
for row in options or []:
name = row.get("Contract Name")
if extract_contract_expiry_code(name) == expected_code:
return True
return False
encoded = urllib.parse.quote(symbol, safe="")
base_url = f"https://finance.yahoo.com/quote/{encoded}/options/"
requested_expiration = expiration.strip() if expiration else None
if not requested_expiration:
requested_expiration = None
url = base_url
app.logger.info(
"Starting scrape for symbol=%s expiration=%s url=%s",
symbol,
requested_expiration,
base_url,
)
calls_html = None
puts_html = None
calls_full = []
puts_full = []
price = None
selected_expiration_value = None
selected_expiration_label = None
expiration_options = []
target_date = None
fallback_to_base = False
with sync_playwright() as p:
launch_args = chromium_launch_args()
if launch_args:
app.logger.info("GPU acceleration enabled")
else:
app.logger.info("GPU acceleration disabled")
browser = p.chromium.launch(headless=True, args=launch_args)
page = browser.new_page()
page.set_extra_http_headers(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
)
}
)
page.set_default_timeout(60000)
try:
if requested_expiration:
if requested_expiration.isdigit():
target_date = int(requested_expiration)
selected_expiration_value = target_date
selected_expiration_label = format_expiration_label(target_date)
else:
parsed_date = parse_date(requested_expiration)
if parsed_date:
target_date = int(
datetime(
parsed_date.year,
parsed_date.month,
parsed_date.day,
tzinfo=timezone.utc,
).timestamp()
)
selected_expiration_value = target_date
selected_expiration_label = format_expiration_label(target_date)
else:
fallback_to_base = True
if target_date:
url = f"{base_url}?date={target_date}"
page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Page loaded (domcontentloaded) for %s", symbol)
option_chain, expiration_dates = read_option_chain(page)
app.logger.info("Option chain found: %s", bool(option_chain))
expiration_options = build_expiration_options(expiration_dates)
if fallback_to_base:
resolved_value, resolved_label = resolve_expiration(
requested_expiration, expiration_options
)
if resolved_value is None:
return {
"error": "Requested expiration not available",
"stock": symbol,
"requested_expiration": requested_expiration,
"available_expirations": [
{"label": opt.get("label"), "value": opt.get("value")}
for opt in expiration_options
],
}
target_date = resolved_value
selected_expiration_value = resolved_value
selected_expiration_label = resolved_label or format_expiration_label(
resolved_value
)
url = f"{base_url}?date={resolved_value}"
page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Page loaded (domcontentloaded) for %s", symbol)
option_chain, expiration_dates = read_option_chain(page)
expiration_options = build_expiration_options(expiration_dates)
if target_date and expiration_options:
matched = None
for opt in expiration_options:
if opt.get("value") == target_date:
matched = opt
break
if not matched:
return {
"error": "Requested expiration not available",
"stock": symbol,
"requested_expiration": requested_expiration,
"available_expirations": [
{"label": opt.get("label"), "value": opt.get("value")}
for opt in expiration_options
],
}
selected_expiration_value = matched.get("value")
selected_expiration_label = matched.get("label")
elif expiration_options and not target_date:
selected_expiration_value = expiration_options[0].get("value")
selected_expiration_label = expiration_options[0].get("label")
calls_full, puts_full = build_rows_from_chain(option_chain)
app.logger.info(
"Option chain rows: calls=%d puts=%d",
len(calls_full),
len(puts_full),
)
if not calls_full and not puts_full:
app.logger.info("Waiting for options tables...")
tables = wait_for_tables(page)
if len(tables) < 2:
app.logger.error(
"Only %d tables found; expected 2. HTML may have changed.",
len(tables),
)
return {"error": "Could not locate options tables", "stock": symbol}
app.logger.info("Found %d tables. Extracting Calls & Puts.", len(tables))
calls_html = tables[0].evaluate("el => el.outerHTML")
puts_html = tables[1].evaluate("el => el.outerHTML")
# --- Extract current price ---
try:
# Primary selector
price_text = page.locator(
"fin-streamer[data-field='regularMarketPrice']"
).inner_text()
price = float(price_text.replace(",", ""))
except Exception:
try:
# Fallback
price_text = page.locator("span[data-testid='qsp-price']").inner_text()
price = float(price_text.replace(",", ""))
except Exception as e:
app.logger.warning("Failed to extract price for %s: %s", symbol, e)
app.logger.info("Current price for %s = %s", symbol, price)
finally:
browser.close()
if not calls_full and not puts_full and calls_html and puts_html:
calls_full = parse_table(calls_html, "calls")
puts_full = parse_table(puts_html, "puts")
expected_code = expected_expiry_code(target_date)
if expected_code:
if not has_expected_expiry(calls_full, expected_code) and not has_expected_expiry(
puts_full, expected_code
):
return {
"error": "Options chain does not match requested expiration",
"stock": symbol,
"requested_expiration": requested_expiration,
"expected_expiration_code": expected_code,
"selected_expiration": {
"value": selected_expiration_value,
"label": selected_expiration_label,
},
}
# ----------------------------------------------------------------------
# Pruning logic
# ----------------------------------------------------------------------
def prune_nearest(options, price_value, limit=25, side=""):
if price_value is None:
return options, 0
numeric = [o for o in options if isinstance(o.get("Strike"), (int, float))]
if len(numeric) <= limit:
return numeric, 0
sorted_opts = sorted(numeric, key=lambda x: abs(x["Strike"] - price_value))
pruned = sorted_opts[:limit]
pruned_count = len(options) - len(pruned)
return pruned, pruned_count
calls, pruned_calls = prune_nearest(
calls_full,
price,
limit=strike_limit,
side="calls",
)
puts, pruned_puts = prune_nearest(
puts_full,
price,
limit=strike_limit,
side="puts",
)
def strike_range(opts):
strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))]
return [min(strikes), max(strikes)] if strikes else [None, None]
return {
"stock": symbol,
"url": url,
"requested_expiration": requested_expiration,
"selected_expiration": {
"value": selected_expiration_value,
"label": selected_expiration_label,
},
"current_price": price,
"calls": calls,
"puts": puts,
"calls_strike_range": strike_range(calls),
"puts_strike_range": strike_range(puts),
"total_calls": len(calls),
"total_puts": len(puts),
"pruned_calls_count": pruned_calls,
"pruned_puts_count": pruned_puts,
}
@app.route("/scrape_sync")
def scrape_sync():
symbol = request.args.get("stock")
if not symbol:
return jsonify({"error": "Missing 'stock' parameter"}), 400
expiration = (
request.args.get("expiration")
or request.args.get("expiry")
or request.args.get("date")
)
strike_limit = parse_strike_limit(request.args.get("strikeLimit"), default=25)
app.logger.info(
"Received /scrape_sync request for symbol=%s expiration=%s strike_limit=%s",
symbol,
expiration,
strike_limit,
)
return jsonify(scrape_yahoo_options(symbol, expiration, strike_limit))
@app.route("/profile")
def profile():
symbol = request.args.get("stock")
if not symbol:
return jsonify({"error": "Missing 'stock' parameter"}), 400
app.logger.info("Received /profile request for symbol=%s", symbol)
return jsonify(scrape_yahoo_profile(symbol))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9777)