from flask import Flask, jsonify, request from playwright.sync_api import sync_playwright from bs4 import BeautifulSoup from datetime import datetime, timezone import html import urllib.parse import logging import json import re import time import os app = Flask(__name__) # Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) app.logger.setLevel(logging.INFO) DATE_FORMATS = ( "%Y-%m-%d", "%Y/%m/%d", "%Y%m%d", "%b %d, %Y", "%B %d, %Y", ) GPU_ACCEL_ENV = "ENABLE_GPU" def parse_env_flag(value, default=False): if value is None: return default return str(value).strip().lower() in ("1", "true", "yes", "on") def detect_gpu_available(): env_value = os.getenv(GPU_ACCEL_ENV) if env_value is not None: return parse_env_flag(env_value, default=False) nvidia_visible = os.getenv("NVIDIA_VISIBLE_DEVICES") if nvidia_visible and nvidia_visible.lower() not in ("none", "void", "off"): return True if os.path.exists("/dev/nvidia0"): return True if os.path.exists("/dev/dri/renderD128") or os.path.exists("/dev/dri/card0"): return True return False def chromium_launch_args(): if not detect_gpu_available(): return [] if os.name == "nt": return ["--enable-gpu"] return [ "--enable-gpu", "--ignore-gpu-blocklist", "--disable-software-rasterizer", "--use-gl=egl", "--enable-zero-copy", "--enable-gpu-rasterization", ] def parse_date(value): for fmt in DATE_FORMATS: try: return datetime.strptime(value, fmt).date() except ValueError: continue return None def normalize_label(value): return " ".join(value.strip().split()).lower() def format_expiration_label(timestamp): try: return datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d") except Exception: return str(timestamp) def format_percent(value): if value is None: return None try: return f"{value * 100:.2f}%" except Exception: return None def extract_raw_value(value): if isinstance(value, dict): return value.get("raw") return value def extract_value(value): if isinstance(value, dict): if value.get("raw") is not None: return value.get("raw") return value.get("fmt") return value def extract_fmt_value(value): if isinstance(value, dict): return value.get("fmt") return None def format_percent_value(value): fmt = extract_fmt_value(value) if fmt is not None: return fmt return format_percent(extract_raw_value(value)) def format_last_trade_date(timestamp): timestamp = extract_raw_value(timestamp) if not timestamp: return None try: return datetime.fromtimestamp(timestamp).strftime("%m/%d/%Y %I:%M %p") + " EST" except Exception: return None def extract_option_chain_from_html(html): if not html: return None token = "\"body\":\"" start = 0 while True: idx = html.find(token, start) if idx == -1: break i = idx + len(token) escaped = False raw_chars = [] while i < len(html): ch = html[i] if escaped: raw_chars.append(ch) escaped = False else: if ch == "\\": raw_chars.append(ch) escaped = True elif ch == "\"": break else: raw_chars.append(ch) i += 1 raw = "".join(raw_chars) try: body_text = json.loads(f"\"{raw}\"") except json.JSONDecodeError: start = idx + len(token) continue if "optionChain" not in body_text: start = idx + len(token) continue try: payload = json.loads(body_text) except json.JSONDecodeError: start = idx + len(token) continue option_chain = payload.get("optionChain") if option_chain and option_chain.get("result"): return option_chain start = idx + len(token) return None def extract_expiration_dates_from_chain(chain): if not chain: return [] result = chain.get("result", []) if not result: return [] return result[0].get("expirationDates", []) or [] def normalize_chain_rows(rows): normalized = [] for row in rows or []: normalized.append( { "Contract Name": row.get("contractSymbol"), "Last Trade Date (EST)": format_last_trade_date( row.get("lastTradeDate") ), "Strike": extract_raw_value(row.get("strike")), "Last Price": extract_raw_value(row.get("lastPrice")), "Bid": extract_raw_value(row.get("bid")), "Ask": extract_raw_value(row.get("ask")), "Change": extract_raw_value(row.get("change")), "% Change": format_percent_value(row.get("percentChange")), "Volume": extract_raw_value(row.get("volume")), "Open Interest": extract_raw_value(row.get("openInterest")), "Implied Volatility": format_percent_value( row.get("impliedVolatility") ), } ) return normalized def build_rows_from_chain(chain): result = chain.get("result", []) if chain else [] if not result: return [], [] options = result[0].get("options", []) if not options: return [], [] option = options[0] return ( normalize_chain_rows(option.get("calls")), normalize_chain_rows(option.get("puts")), ) def extract_contract_expiry_code(contract_name): if not contract_name: return None match = re.search(r"(\d{6})", contract_name) return match.group(1) if match else None def expected_expiry_code(timestamp): if not timestamp: return None try: return datetime.utcfromtimestamp(timestamp).strftime("%y%m%d") except Exception: return None def extract_expiration_dates_from_html(html): if not html: return [] patterns = ( r'\\"expirationDates\\":\[(.*?)\]', r'"expirationDates":\[(.*?)\]', ) match = None for pattern in patterns: match = re.search(pattern, html, re.DOTALL) if match: break if not match: return [] raw = match.group(1) values = [] for part in raw.split(","): part = part.strip() if part.isdigit(): try: values.append(int(part)) except Exception: continue return values def build_expiration_options(expiration_dates): options = [] for value in expiration_dates or []: try: value_int = int(value) except Exception: continue label = format_expiration_label(value_int) try: date_value = datetime.utcfromtimestamp(value_int).date() except Exception: date_value = None options.append({"value": value_int, "label": label, "date": date_value}) return sorted(options, key=lambda x: x["value"]) def resolve_expiration(expiration, options): if not expiration: return None, None raw = expiration.strip() if not raw: return None, None if raw.isdigit(): value = int(raw) if options: for opt in options: if opt.get("value") == value: return value, opt.get("label") return None, None return value, format_expiration_label(value) requested_date = parse_date(raw) if requested_date: for opt in options: if opt.get("date") == requested_date: return opt.get("value"), opt.get("label") return None, None normalized = normalize_label(raw) for opt in options: if normalize_label(opt.get("label", "")) == normalized: return opt.get("value"), opt.get("label") return None, None def wait_for_tables(page): try: page.wait_for_selector( "section[data-testid='options-list-table'] table", timeout=30000, ) except Exception: page.wait_for_selector("table", timeout=30000) for _ in range(30): # 30 * 1s = 30 seconds tables = page.query_selector_all( "section[data-testid='options-list-table'] table" ) if len(tables) >= 2: return tables tables = page.query_selector_all("table") if len(tables) >= 2: return tables time.sleep(1) return [] def parse_strike_limit(value, default=25): if value is None: return default try: limit = int(value) except (TypeError, ValueError): return default return limit if limit > 0 else default def parse_sveltekit_payload(raw_text): if not raw_text: return None try: outer = json.loads(raw_text) except json.JSONDecodeError: return None body = outer.get("body") if isinstance(body, str): try: body = json.loads(body) except json.JSONDecodeError: pass return { "status": outer.get("status"), "statusText": outer.get("statusText"), "body": body, } def extract_sveltekit_payloads_from_soup(soup): payloads = {} if soup is None: return payloads scripts = soup.select('script[type="application/json"][data-sveltekit-fetched]') for script in scripts: url = script.get("data-url") if not url: continue url = html.unescape(url) raw_text = script.string or script.get_text() payload = parse_sveltekit_payload(raw_text) if not payload: continue payloads[url] = payload return payloads def select_payload(payloads, needle, symbol=None): if not payloads: return None, None needle = needle.lower() symbol_token = symbol.lower() if symbol else None fallback = None for url, payload in payloads.items(): url_lower = url.lower() if needle not in url_lower: continue if symbol_token: if f"/{symbol_token}" in url_lower or f"symbols={symbol_token}" in url_lower: return url, payload.get("body") if fallback is None: fallback = (url, payload.get("body")) return fallback if fallback else (None, None) def extract_quote_summary(payload): if not payload: return None summary = payload.get("quoteSummary") if not summary: return None result = summary.get("result") or [] return result[0] if result else None def extract_quote_response(payload): if not payload: return None response = payload.get("quoteResponse") if not response: return None result = response.get("result") or [] return result[0] if result else None def extract_quote_type(payload): if not payload: return None quote_type = payload.get("quoteType") if not quote_type: return None result = quote_type.get("result") or [] return result[0] if result else None def extract_recent_news_from_soup(soup, limit=20): items = [] if soup is None: return items container = soup.select_one('[data-testid="recent-news"]') root = container if container else soup seen = set() for item in root.select('[data-testid="storyitem"]'): title_el = item.select_one("h3") link_el = item.select_one("a[href]") if not title_el and not link_el: continue title = title_el.get_text(strip=True) if title_el else None link = link_el.get("href") if link_el else None publisher = None published = None publishing = item.select_one(".publishing") if publishing: text = " ".join(publishing.stripped_strings) if "\u2022" in text: parts = [part.strip() for part in text.split("\u2022", 1)] publisher = parts[0] or None published = parts[1] if len(parts) > 1 else None else: publisher = text or None key = link or title if key and key in seen: continue if key: seen.add(key) items.append( { "title": title, "publisher": publisher, "published": published, "link": link, } ) if limit and len(items) >= limit: break return items def extract_news_summary_from_soup(soup): if soup is None: return None summary = soup.select_one('[data-testid="ticker-news-summary"]') if not summary: return None text = " ".join(summary.stripped_strings) return text if text else None def build_profile_key_metrics(summary_detail, key_stats, financial_data, price_data, quote): summary_detail = summary_detail or {} key_stats = key_stats or {} financial_data = financial_data or {} price_data = price_data or {} quote = quote or {} def pick_value(*values): for value in values: if value is not None: return value return None return { "previous_close": extract_raw_value(summary_detail.get("previousClose")), "open": extract_raw_value(summary_detail.get("open")), "bid": extract_raw_value(summary_detail.get("bid")), "ask": extract_raw_value(summary_detail.get("ask")), "bid_size": extract_raw_value(summary_detail.get("bidSize")), "ask_size": extract_raw_value(summary_detail.get("askSize")), "day_low": extract_raw_value(summary_detail.get("dayLow")), "day_high": extract_raw_value(summary_detail.get("dayHigh")), "fifty_two_week_low": extract_raw_value(quote.get("fiftyTwoWeekLow")), "fifty_two_week_high": extract_raw_value(quote.get("fiftyTwoWeekHigh")), "volume": pick_value( extract_raw_value(summary_detail.get("volume")), extract_raw_value(price_data.get("regularMarketVolume")), extract_raw_value(quote.get("regularMarketVolume")), ), "average_volume": pick_value( extract_raw_value(summary_detail.get("averageVolume")), extract_raw_value(price_data.get("averageDailyVolume3Month")), ), "market_cap": pick_value( extract_raw_value(summary_detail.get("marketCap")), extract_raw_value(quote.get("marketCap")), ), "beta": pick_value( extract_raw_value(summary_detail.get("beta")), extract_raw_value(key_stats.get("beta")), ), "trailing_pe": pick_value( extract_raw_value(summary_detail.get("trailingPE")), extract_raw_value(key_stats.get("trailingPE")), ), "forward_pe": pick_value( extract_raw_value(summary_detail.get("forwardPE")), extract_raw_value(key_stats.get("forwardPE")), ), "eps_trailing": extract_raw_value(key_stats.get("trailingEps")), "eps_forward": extract_raw_value(key_stats.get("forwardEps")), "dividend_rate": extract_raw_value(summary_detail.get("dividendRate")), "dividend_yield": extract_raw_value(summary_detail.get("dividendYield")), "ex_dividend_date": extract_raw_value(summary_detail.get("exDividendDate")), "payout_ratio": extract_raw_value(summary_detail.get("payoutRatio")), "implied_volatility": extract_raw_value(summary_detail.get("impliedVolatility")), "current_price": pick_value( extract_raw_value(price_data.get("regularMarketPrice")), extract_raw_value(financial_data.get("currentPrice")), extract_raw_value(quote.get("regularMarketPrice")), ), "recommendation_key": financial_data.get("recommendationKey"), "recommendation_mean": extract_raw_value(financial_data.get("recommendationMean")), "target_price_high": extract_raw_value(financial_data.get("targetHighPrice")), "target_price_low": extract_raw_value(financial_data.get("targetLowPrice")), "target_price_mean": extract_raw_value(financial_data.get("targetMeanPrice")), "target_price_median": extract_raw_value(financial_data.get("targetMedianPrice")), "analyst_opinion_count": extract_raw_value( financial_data.get("numberOfAnalystOpinions") ), } def simplify_recommendation_trend(trend): simplified = [] for entry in trend or []: simplified.append( { "period": entry.get("period"), "strong_buy": entry.get("strongBuy"), "buy": entry.get("buy"), "hold": entry.get("hold"), "sell": entry.get("sell"), "strong_sell": entry.get("strongSell"), } ) return simplified def simplify_upgrade_history(history, limit=20): simplified = [] for entry in history or []: simplified.append( { "firm": entry.get("firm"), "action": entry.get("action"), "from_grade": entry.get("fromGrade"), "to_grade": entry.get("toGrade"), "date": entry.get("epochGradeDate") or entry.get("gradeDate"), } ) if limit and len(simplified) >= limit: break return simplified def simplify_ratings_top(payload): if not payload: return None simplified = {} for key, value in payload.items(): if not isinstance(value, dict): continue simplified[key] = { "analyst": value.get("analyst"), "rating_current": value.get("rating_current"), "rating_sentiment": value.get("rating_sentiment"), "pt_current": value.get("pt_current"), "adjusted_pt_current": value.get("adjusted_pt_current"), "announcement_date": value.get("announcement_date"), "datapoints": value.get("datapoints"), "scores": { "dir": extract_value(value.get("dir")), "mm": extract_value(value.get("mm")), "pt": extract_value(value.get("pt")), "fin_score": extract_value(value.get("fin_score")), }, } return simplified or None def summarize_performance(perf_data): if not perf_data: return {} overview = perf_data.get("performanceOverview") if isinstance(overview, dict): return { "as_of_date": extract_value(overview.get("asOfDate")), "returns": { "five_day": extract_value(overview.get("fiveDaysReturn")), "one_month": extract_value(overview.get("oneMonthReturn")), "three_month": extract_value(overview.get("threeMonthReturn")), "six_month": extract_value(overview.get("sixMonthReturn")), "ytd": extract_value(overview.get("ytdReturnPct")), "one_year": extract_value(overview.get("oneYearTotalReturn")), "two_year": extract_value(overview.get("twoYearTotalReturn")), "three_year": extract_value(overview.get("threeYearTotalReturn")), "five_year": extract_value(overview.get("fiveYearTotalReturn")), "ten_year": extract_value(overview.get("tenYearTotalReturn")), "max": extract_value(overview.get("maxReturn")), }, } summary = [] for entry in overview or []: if not isinstance(entry, dict): continue summary.append( { "period": entry.get("period"), "performance": extract_value(entry.get("performance")), "benchmark": extract_value(entry.get("benchmark")), } ) return {"periods": summary} if summary else {} def summarize_earnings(earnings, calendar_events): earnings = earnings or {} calendar_events = calendar_events or {} earnings_chart = earnings.get("earningsChart", {}) or {} financials_chart = earnings.get("financialsChart", {}) or {} calendar_earnings = calendar_events.get("earnings", {}) or {} quarterly = [] for entry in earnings_chart.get("quarterly") or []: quarterly.append( { "quarter": entry.get("date"), "actual": extract_value(entry.get("actual")), "estimate": extract_value(entry.get("estimate")), "surprise": extract_value(entry.get("difference")), "surprise_percent": extract_value(entry.get("surprisePct")), } ) yearly = [] for entry in financials_chart.get("yearly") or []: yearly.append( { "year": entry.get("date"), "revenue": extract_value(entry.get("revenue")), "earnings": extract_value(entry.get("earnings")), } ) quarterly_financials = [] for entry in financials_chart.get("quarterly") or []: quarterly_financials.append( { "quarter": entry.get("date"), "revenue": extract_value(entry.get("revenue")), "earnings": extract_value(entry.get("earnings")), } ) return { "next_earnings_dates": [ extract_value(value) for value in calendar_earnings.get("earningsDate", []) or [] ], "is_earnings_date_estimate": calendar_earnings.get("isEarningsDateEstimate"), "earnings_estimates": { "average": extract_value(calendar_earnings.get("earningsAverage")), "low": extract_value(calendar_earnings.get("earningsLow")), "high": extract_value(calendar_earnings.get("earningsHigh")), }, "revenue_estimates": { "average": extract_value(calendar_earnings.get("revenueAverage")), "low": extract_value(calendar_earnings.get("revenueLow")), "high": extract_value(calendar_earnings.get("revenueHigh")), }, "quarterly_earnings": quarterly[:4], "yearly_financials": yearly[:4], "quarterly_financials": quarterly_financials[:4], "current_quarter_estimate": extract_value( earnings_chart.get("currentQuarterEstimate") ), "current_quarter_estimate_date": earnings_chart.get("currentQuarterEstimateDate"), "current_calendar_quarter": earnings_chart.get("currentCalendarQuarter"), "current_fiscal_quarter": earnings_chart.get("currentFiscalQuarter"), } def scrape_yahoo_profile(symbol): encoded = urllib.parse.quote(symbol, safe="") url = f"https://finance.yahoo.com/quote/{encoded}/" app.logger.info("Starting profile scrape for symbol=%s url=%s", symbol, url) response_html = None rendered_html = None payloads = {} with sync_playwright() as p: launch_args = chromium_launch_args() if launch_args: app.logger.info("GPU acceleration enabled") else: app.logger.info("GPU acceleration disabled") browser = p.chromium.launch(headless=True, args=launch_args) page = browser.new_page() page.set_extra_http_headers( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36" ) } ) page.set_default_timeout(60000) try: response = page.goto(url, wait_until="domcontentloaded", timeout=60000) app.logger.info("Profile page loaded (domcontentloaded) for %s", symbol) if response: response_html = response.text() else: app.logger.warning("No response body for profile page %s", symbol) page.wait_for_timeout(1000) rendered_html = page.content() finally: browser.close() if not response_html and not rendered_html: return {"error": "Profile page content missing", "stock": symbol, "url": url} payload_source = response_html or rendered_html payload_soup = BeautifulSoup(payload_source, "html.parser") if payload_source else None payloads = extract_sveltekit_payloads_from_soup(payload_soup) if not payloads and rendered_html and rendered_html != payload_source: fallback_soup = BeautifulSoup(rendered_html, "html.parser") payloads = extract_sveltekit_payloads_from_soup(fallback_soup) if not payloads: return { "error": "No embedded payloads found on profile page", "stock": symbol, "url": url, } quote_summary_url, quote_summary_payload = select_payload( payloads, "quoteSummary", symbol ) quote_url, quote_payload = select_payload(payloads, "v7/finance/quote?", symbol) quote_type_url, quote_type_payload = select_payload( payloads, "/v1/finance/quoteType/", symbol ) ratings_url, ratings_payload = select_payload(payloads, "ratings/top", symbol) recs_url, recs_payload = select_payload( payloads, "recommendationsbysymbol", symbol ) quote_summary = extract_quote_summary(quote_summary_payload) quote = extract_quote_response(quote_payload) quote_type = extract_quote_type(quote_type_payload) summary_detail = quote_summary.get("summaryDetail", {}) if quote_summary else {} key_stats = quote_summary.get("defaultKeyStatistics", {}) if quote_summary else {} financial_data = quote_summary.get("financialData", {}) if quote_summary else {} price_data = quote_summary.get("price", {}) if quote_summary else {} recommendation_trend = ( quote_summary.get("recommendationTrend", {}) if quote_summary else {} ) upgrade_history = ( quote_summary.get("upgradeDowngradeHistory", {}) if quote_summary else {} ) earnings = quote_summary.get("earnings", {}) if quote_summary else {} calendar_events = quote_summary.get("calendarEvents", {}) if quote_summary else {} equity_performance = ( quote_summary.get("equityPerformance", {}) if quote_summary else {} ) performance_overview = ( quote_summary.get("quoteUnadjustedPerformanceOverview", {}) if quote_summary else {} ) key_metrics = build_profile_key_metrics( summary_detail, key_stats, financial_data, price_data, quote ) valuation = { "market_cap": extract_raw_value(key_stats.get("marketCap")), "enterprise_value": extract_raw_value(key_stats.get("enterpriseValue")), "price_to_book": extract_raw_value(key_stats.get("priceToBook")), "price_to_sales": extract_raw_value(key_stats.get("priceToSalesTrailing12Months")), "trailing_pe": key_metrics.get("trailing_pe"), "forward_pe": key_metrics.get("forward_pe"), } profitability = { "profit_margins": extract_raw_value(financial_data.get("profitMargins")), "operating_margins": extract_raw_value(financial_data.get("operatingMargins")), "gross_margins": extract_raw_value(financial_data.get("grossMargins")), "ebitda_margins": extract_raw_value(financial_data.get("ebitdaMargins")), "return_on_assets": extract_raw_value(financial_data.get("returnOnAssets")), "return_on_equity": extract_raw_value(financial_data.get("returnOnEquity")), } growth = { "revenue_growth": extract_raw_value(financial_data.get("revenueGrowth")), "earnings_growth": extract_raw_value(financial_data.get("earningsGrowth")), "revenue_per_share": extract_raw_value(financial_data.get("revenuePerShare")), } financial_strength = { "total_cash": extract_raw_value(financial_data.get("totalCash")), "total_debt": extract_raw_value(financial_data.get("totalDebt")), "debt_to_equity": extract_raw_value(financial_data.get("debtToEquity")), "current_ratio": extract_raw_value(financial_data.get("currentRatio")), "quick_ratio": extract_raw_value(financial_data.get("quickRatio")), } cashflow = { "operating_cashflow": extract_raw_value(financial_data.get("operatingCashflow")), "free_cashflow": extract_raw_value(financial_data.get("freeCashflow")), "ebitda": extract_raw_value(financial_data.get("ebitda")), } ownership = { "shares_outstanding": extract_raw_value(key_stats.get("sharesOutstanding")), "float_shares": extract_raw_value(key_stats.get("floatShares")), "shares_short": extract_raw_value(key_stats.get("sharesShort")), "short_ratio": extract_raw_value(key_stats.get("shortRatio")), "short_percent_of_float": extract_raw_value(key_stats.get("shortPercentOfFloat")), "held_percent_insiders": extract_raw_value(key_stats.get("heldPercentInsiders")), "held_percent_institutions": extract_raw_value( key_stats.get("heldPercentInstitutions") ), } analyst = { "recommendation": { "key": key_metrics.get("recommendation_key"), "mean": key_metrics.get("recommendation_mean"), "analyst_opinion_count": key_metrics.get("analyst_opinion_count"), "target_price_high": key_metrics.get("target_price_high"), "target_price_low": key_metrics.get("target_price_low"), "target_price_mean": key_metrics.get("target_price_mean"), "target_price_median": key_metrics.get("target_price_median"), }, "trend": simplify_recommendation_trend(recommendation_trend.get("trend")), "upgrades_downgrades": simplify_upgrade_history( upgrade_history.get("history"), limit=20 ), "ratings_top": simplify_ratings_top(ratings_payload), } earnings_summary = summarize_earnings(earnings, calendar_events) performance_summary = { "equity_performance": summarize_performance(equity_performance), "unadjusted_performance": summarize_performance(performance_overview), } matched_symbols = [] for candidate in [ price_data.get("symbol") if price_data else None, quote.get("symbol") if quote else None, quote_type.get("symbol") if quote_type else None, ]: if candidate: matched_symbols.append(candidate) symbol_match = None if matched_symbols: symbol_match = any( candidate.upper() == symbol.upper() for candidate in matched_symbols ) issues = [] if not quote_summary: issues.append("missing_quote_summary") if matched_symbols and not symbol_match: issues.append("symbol_mismatch") if not quote: issues.append("missing_quote_data") if not quote_type: issues.append("missing_quote_type") validation = { "requested_symbol": symbol, "matched_symbols": matched_symbols, "symbol_match": symbol_match, "issues": issues, } if "missing_quote_summary" in issues or "symbol_mismatch" in issues: return { "error": "Profile validation failed", "stock": symbol, "url": url, "validation": validation, "data_sources": { "quote_summary": quote_summary_url, "quote": quote_url, "quote_type": quote_type_url, "ratings_top": ratings_url, "recommendations": recs_url, }, } return { "stock": symbol, "url": url, "fetched_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), "validation": validation, "key_metrics": key_metrics, "valuation": valuation, "profitability": profitability, "growth": growth, "financial_strength": financial_strength, "cashflow": cashflow, "ownership": ownership, "analyst": analyst, "earnings": earnings_summary, "performance": performance_summary, "data_sources": { "quote_summary": quote_summary_url, "quote": quote_url, "quote_type": quote_type_url, "ratings_top": ratings_url, "recommendations": recs_url, }, } def scrape_yahoo_options(symbol, expiration=None, strike_limit=25): def parse_table(table_html, side): if not table_html: app.logger.warning("No %s table HTML for %s", side, symbol) return [] soup = BeautifulSoup(table_html, "html.parser") headers = [th.get_text(strip=True) for th in soup.select("thead th")] rows = soup.select("tbody tr") parsed = [] for r in rows: tds = r.find_all("td") if len(tds) != len(headers): continue item = {} for i, c in enumerate(tds): key = headers[i] val = c.get_text(" ", strip=True) # Convert numeric fields if key in ["Strike", "Last Price", "Bid", "Ask", "Change"]: try: val = float(val.replace(",", "")) except Exception: val = None elif key in ["Volume", "Open Interest"]: try: val = int(val.replace(",", "")) except Exception: val = None elif val in ["-", ""]: val = None item[key] = val parsed.append(item) app.logger.info("Parsed %d %s rows", len(parsed), side) return parsed def read_option_chain(page): html = page.content() option_chain = extract_option_chain_from_html(html) if option_chain: expiration_dates = extract_expiration_dates_from_chain(option_chain) else: expiration_dates = extract_expiration_dates_from_html(html) return option_chain, expiration_dates def has_expected_expiry(options, expected_code): if not expected_code: return False for row in options or []: name = row.get("Contract Name") if extract_contract_expiry_code(name) == expected_code: return True return False encoded = urllib.parse.quote(symbol, safe="") base_url = f"https://finance.yahoo.com/quote/{encoded}/options/" requested_expiration = expiration.strip() if expiration else None if not requested_expiration: requested_expiration = None url = base_url app.logger.info( "Starting scrape for symbol=%s expiration=%s url=%s", symbol, requested_expiration, base_url, ) calls_html = None puts_html = None calls_full = [] puts_full = [] price = None selected_expiration_value = None selected_expiration_label = None expiration_options = [] target_date = None fallback_to_base = False with sync_playwright() as p: launch_args = chromium_launch_args() if launch_args: app.logger.info("GPU acceleration enabled") else: app.logger.info("GPU acceleration disabled") browser = p.chromium.launch(headless=True, args=launch_args) page = browser.new_page() page.set_extra_http_headers( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36" ) } ) page.set_default_timeout(60000) try: if requested_expiration: if requested_expiration.isdigit(): target_date = int(requested_expiration) selected_expiration_value = target_date selected_expiration_label = format_expiration_label(target_date) else: parsed_date = parse_date(requested_expiration) if parsed_date: target_date = int( datetime( parsed_date.year, parsed_date.month, parsed_date.day, tzinfo=timezone.utc, ).timestamp() ) selected_expiration_value = target_date selected_expiration_label = format_expiration_label(target_date) else: fallback_to_base = True if target_date: url = f"{base_url}?date={target_date}" page.goto(url, wait_until="domcontentloaded", timeout=60000) app.logger.info("Page loaded (domcontentloaded) for %s", symbol) option_chain, expiration_dates = read_option_chain(page) app.logger.info("Option chain found: %s", bool(option_chain)) expiration_options = build_expiration_options(expiration_dates) if fallback_to_base: resolved_value, resolved_label = resolve_expiration( requested_expiration, expiration_options ) if resolved_value is None: return { "error": "Requested expiration not available", "stock": symbol, "requested_expiration": requested_expiration, "available_expirations": [ {"label": opt.get("label"), "value": opt.get("value")} for opt in expiration_options ], } target_date = resolved_value selected_expiration_value = resolved_value selected_expiration_label = resolved_label or format_expiration_label( resolved_value ) url = f"{base_url}?date={resolved_value}" page.goto(url, wait_until="domcontentloaded", timeout=60000) app.logger.info("Page loaded (domcontentloaded) for %s", symbol) option_chain, expiration_dates = read_option_chain(page) expiration_options = build_expiration_options(expiration_dates) if target_date and expiration_options: matched = None for opt in expiration_options: if opt.get("value") == target_date: matched = opt break if not matched: return { "error": "Requested expiration not available", "stock": symbol, "requested_expiration": requested_expiration, "available_expirations": [ {"label": opt.get("label"), "value": opt.get("value")} for opt in expiration_options ], } selected_expiration_value = matched.get("value") selected_expiration_label = matched.get("label") elif expiration_options and not target_date: selected_expiration_value = expiration_options[0].get("value") selected_expiration_label = expiration_options[0].get("label") calls_full, puts_full = build_rows_from_chain(option_chain) app.logger.info( "Option chain rows: calls=%d puts=%d", len(calls_full), len(puts_full), ) if not calls_full and not puts_full: app.logger.info("Waiting for options tables...") tables = wait_for_tables(page) if len(tables) < 2: app.logger.error( "Only %d tables found; expected 2. HTML may have changed.", len(tables), ) return {"error": "Could not locate options tables", "stock": symbol} app.logger.info("Found %d tables. Extracting Calls & Puts.", len(tables)) calls_html = tables[0].evaluate("el => el.outerHTML") puts_html = tables[1].evaluate("el => el.outerHTML") # --- Extract current price --- try: # Primary selector price_text = page.locator( "fin-streamer[data-field='regularMarketPrice']" ).inner_text() price = float(price_text.replace(",", "")) except Exception: try: # Fallback price_text = page.locator("span[data-testid='qsp-price']").inner_text() price = float(price_text.replace(",", "")) except Exception as e: app.logger.warning("Failed to extract price for %s: %s", symbol, e) app.logger.info("Current price for %s = %s", symbol, price) finally: browser.close() if not calls_full and not puts_full and calls_html and puts_html: calls_full = parse_table(calls_html, "calls") puts_full = parse_table(puts_html, "puts") expected_code = expected_expiry_code(target_date) if expected_code: if not has_expected_expiry(calls_full, expected_code) and not has_expected_expiry( puts_full, expected_code ): return { "error": "Options chain does not match requested expiration", "stock": symbol, "requested_expiration": requested_expiration, "expected_expiration_code": expected_code, "selected_expiration": { "value": selected_expiration_value, "label": selected_expiration_label, }, } # ---------------------------------------------------------------------- # Pruning logic # ---------------------------------------------------------------------- def prune_nearest(options, price_value, limit=25, side=""): if price_value is None: return options, 0 numeric = [o for o in options if isinstance(o.get("Strike"), (int, float))] if len(numeric) <= limit: return numeric, 0 sorted_opts = sorted(numeric, key=lambda x: abs(x["Strike"] - price_value)) pruned = sorted_opts[:limit] pruned_count = len(options) - len(pruned) return pruned, pruned_count calls, pruned_calls = prune_nearest( calls_full, price, limit=strike_limit, side="calls", ) puts, pruned_puts = prune_nearest( puts_full, price, limit=strike_limit, side="puts", ) def strike_range(opts): strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))] return [min(strikes), max(strikes)] if strikes else [None, None] return { "stock": symbol, "url": url, "requested_expiration": requested_expiration, "selected_expiration": { "value": selected_expiration_value, "label": selected_expiration_label, }, "current_price": price, "calls": calls, "puts": puts, "calls_strike_range": strike_range(calls), "puts_strike_range": strike_range(puts), "total_calls": len(calls), "total_puts": len(puts), "pruned_calls_count": pruned_calls, "pruned_puts_count": pruned_puts, } @app.route("/scrape_sync") def scrape_sync(): symbol = request.args.get("stock", "MSFT") expiration = ( request.args.get("expiration") or request.args.get("expiry") or request.args.get("date") ) strike_limit = parse_strike_limit(request.args.get("strikeLimit"), default=25) app.logger.info( "Received /scrape_sync request for symbol=%s expiration=%s strike_limit=%s", symbol, expiration, strike_limit, ) return jsonify(scrape_yahoo_options(symbol, expiration, strike_limit)) @app.route("/profile") def profile(): symbol = request.args.get("stock", "MSFT") app.logger.info("Received /profile request for symbol=%s", symbol) return jsonify(scrape_yahoo_profile(symbol)) if __name__ == "__main__": app.run(host="0.0.0.0", port=9777)