from flask import Flask, jsonify, request from playwright.sync_api import sync_playwright from bs4 import BeautifulSoup from datetime import datetime, timezone import urllib.parse import logging import json import re import time app = Flask(__name__) # Logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) app.logger.setLevel(logging.INFO) DATE_FORMATS = ( "%Y-%m-%d", "%Y/%m/%d", "%Y%m%d", "%b %d, %Y", "%B %d, %Y", ) def parse_date(value): for fmt in DATE_FORMATS: try: return datetime.strptime(value, fmt).date() except ValueError: continue return None def normalize_label(value): return " ".join(value.strip().split()).lower() def format_expiration_label(timestamp): try: return datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d") except Exception: return str(timestamp) def format_percent(value): if value is None: return None try: return f"{value * 100:.2f}%" except Exception: return None def extract_raw_value(value): if isinstance(value, dict): return value.get("raw") return value def extract_fmt_value(value): if isinstance(value, dict): return value.get("fmt") return None def format_percent_value(value): fmt = extract_fmt_value(value) if fmt is not None: return fmt return format_percent(extract_raw_value(value)) def format_last_trade_date(timestamp): timestamp = extract_raw_value(timestamp) if not timestamp: return None try: return datetime.fromtimestamp(timestamp).strftime("%m/%d/%Y %I:%M %p") + " EST" except Exception: return None def extract_option_chain_from_html(html): if not html: return None token = "\"body\":\"" start = 0 while True: idx = html.find(token, start) if idx == -1: break i = idx + len(token) escaped = False raw_chars = [] while i < len(html): ch = html[i] if escaped: raw_chars.append(ch) escaped = False else: if ch == "\\": raw_chars.append(ch) escaped = True elif ch == "\"": break else: raw_chars.append(ch) i += 1 raw = "".join(raw_chars) try: body_text = json.loads(f"\"{raw}\"") except json.JSONDecodeError: start = idx + len(token) continue if "optionChain" not in body_text: start = idx + len(token) continue try: payload = json.loads(body_text) except json.JSONDecodeError: start = idx + len(token) continue option_chain = payload.get("optionChain") if option_chain and option_chain.get("result"): return option_chain start = idx + len(token) return None def extract_expiration_dates_from_chain(chain): if not chain: return [] result = chain.get("result", []) if not result: return [] return result[0].get("expirationDates", []) or [] def normalize_chain_rows(rows): normalized = [] for row in rows or []: normalized.append( { "Contract Name": row.get("contractSymbol"), "Last Trade Date (EST)": format_last_trade_date( row.get("lastTradeDate") ), "Strike": extract_raw_value(row.get("strike")), "Last Price": extract_raw_value(row.get("lastPrice")), "Bid": extract_raw_value(row.get("bid")), "Ask": extract_raw_value(row.get("ask")), "Change": extract_raw_value(row.get("change")), "% Change": format_percent_value(row.get("percentChange")), "Volume": extract_raw_value(row.get("volume")), "Open Interest": extract_raw_value(row.get("openInterest")), "Implied Volatility": format_percent_value( row.get("impliedVolatility") ), } ) return normalized def build_rows_from_chain(chain): result = chain.get("result", []) if chain else [] if not result: return [], [] options = result[0].get("options", []) if not options: return [], [] option = options[0] return ( normalize_chain_rows(option.get("calls")), normalize_chain_rows(option.get("puts")), ) def extract_contract_expiry_code(contract_name): if not contract_name: return None match = re.search(r"(\d{6})", contract_name) return match.group(1) if match else None def expected_expiry_code(timestamp): if not timestamp: return None try: return datetime.utcfromtimestamp(timestamp).strftime("%y%m%d") except Exception: return None def extract_expiration_dates_from_html(html): if not html: return [] patterns = ( r'\\"expirationDates\\":\[(.*?)\]', r'"expirationDates":\[(.*?)\]', ) match = None for pattern in patterns: match = re.search(pattern, html, re.DOTALL) if match: break if not match: return [] raw = match.group(1) values = [] for part in raw.split(","): part = part.strip() if part.isdigit(): try: values.append(int(part)) except Exception: continue return values def build_expiration_options(expiration_dates): options = [] for value in expiration_dates or []: try: value_int = int(value) except Exception: continue label = format_expiration_label(value_int) try: date_value = datetime.utcfromtimestamp(value_int).date() except Exception: date_value = None options.append({"value": value_int, "label": label, "date": date_value}) return sorted(options, key=lambda x: x["value"]) def resolve_expiration(expiration, options): if not expiration: return None, None raw = expiration.strip() if not raw: return None, None if raw.isdigit(): value = int(raw) if options: for opt in options: if opt.get("value") == value: return value, opt.get("label") return None, None return value, format_expiration_label(value) requested_date = parse_date(raw) if requested_date: for opt in options: if opt.get("date") == requested_date: return opt.get("value"), opt.get("label") return None, None normalized = normalize_label(raw) for opt in options: if normalize_label(opt.get("label", "")) == normalized: return opt.get("value"), opt.get("label") return None, None def wait_for_tables(page): try: page.wait_for_selector( "section[data-testid='options-list-table'] table", timeout=30000, ) except Exception: page.wait_for_selector("table", timeout=30000) for _ in range(30): # 30 * 1s = 30 seconds tables = page.query_selector_all( "section[data-testid='options-list-table'] table" ) if len(tables) >= 2: return tables tables = page.query_selector_all("table") if len(tables) >= 2: return tables time.sleep(1) return [] def scrape_yahoo_options(symbol, expiration=None): def parse_table(table_html, side): if not table_html: app.logger.warning("No %s table HTML for %s", side, symbol) return [] soup = BeautifulSoup(table_html, "html.parser") headers = [th.get_text(strip=True) for th in soup.select("thead th")] rows = soup.select("tbody tr") parsed = [] for r in rows: tds = r.find_all("td") if len(tds) != len(headers): continue item = {} for i, c in enumerate(tds): key = headers[i] val = c.get_text(" ", strip=True) # Convert numeric fields if key in ["Strike", "Last Price", "Bid", "Ask", "Change"]: try: val = float(val.replace(",", "")) except Exception: val = None elif key in ["Volume", "Open Interest"]: try: val = int(val.replace(",", "")) except Exception: val = None elif val in ["-", ""]: val = None item[key] = val parsed.append(item) app.logger.info("Parsed %d %s rows", len(parsed), side) return parsed def read_option_chain(page): html = page.content() option_chain = extract_option_chain_from_html(html) if option_chain: expiration_dates = extract_expiration_dates_from_chain(option_chain) else: expiration_dates = extract_expiration_dates_from_html(html) return option_chain, expiration_dates def has_expected_expiry(options, expected_code): if not expected_code: return False for row in options or []: name = row.get("Contract Name") if extract_contract_expiry_code(name) == expected_code: return True return False encoded = urllib.parse.quote(symbol, safe="") base_url = f"https://finance.yahoo.com/quote/{encoded}/options/" requested_expiration = expiration.strip() if expiration else None if not requested_expiration: requested_expiration = None url = base_url app.logger.info( "Starting scrape for symbol=%s expiration=%s url=%s", symbol, requested_expiration, base_url, ) calls_html = None puts_html = None calls_full = [] puts_full = [] price = None selected_expiration_value = None selected_expiration_label = None expiration_options = [] target_date = None fallback_to_base = False with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() page.set_extra_http_headers( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36" ) } ) page.set_default_timeout(60000) try: if requested_expiration: if requested_expiration.isdigit(): target_date = int(requested_expiration) selected_expiration_value = target_date selected_expiration_label = format_expiration_label(target_date) else: parsed_date = parse_date(requested_expiration) if parsed_date: target_date = int( datetime( parsed_date.year, parsed_date.month, parsed_date.day, tzinfo=timezone.utc, ).timestamp() ) selected_expiration_value = target_date selected_expiration_label = format_expiration_label(target_date) else: fallback_to_base = True if target_date: url = f"{base_url}?date={target_date}" page.goto(url, wait_until="domcontentloaded", timeout=60000) app.logger.info("Page loaded (domcontentloaded) for %s", symbol) option_chain, expiration_dates = read_option_chain(page) app.logger.info("Option chain found: %s", bool(option_chain)) expiration_options = build_expiration_options(expiration_dates) if fallback_to_base: resolved_value, resolved_label = resolve_expiration( requested_expiration, expiration_options ) if resolved_value is None: return { "error": "Requested expiration not available", "stock": symbol, "requested_expiration": requested_expiration, "available_expirations": [ {"label": opt.get("label"), "value": opt.get("value")} for opt in expiration_options ], } target_date = resolved_value selected_expiration_value = resolved_value selected_expiration_label = resolved_label or format_expiration_label( resolved_value ) url = f"{base_url}?date={resolved_value}" page.goto(url, wait_until="domcontentloaded", timeout=60000) app.logger.info("Page loaded (domcontentloaded) for %s", symbol) option_chain, expiration_dates = read_option_chain(page) expiration_options = build_expiration_options(expiration_dates) if target_date and expiration_options: matched = None for opt in expiration_options: if opt.get("value") == target_date: matched = opt break if not matched: return { "error": "Requested expiration not available", "stock": symbol, "requested_expiration": requested_expiration, "available_expirations": [ {"label": opt.get("label"), "value": opt.get("value")} for opt in expiration_options ], } selected_expiration_value = matched.get("value") selected_expiration_label = matched.get("label") elif expiration_options and not target_date: selected_expiration_value = expiration_options[0].get("value") selected_expiration_label = expiration_options[0].get("label") calls_full, puts_full = build_rows_from_chain(option_chain) app.logger.info( "Option chain rows: calls=%d puts=%d", len(calls_full), len(puts_full), ) if not calls_full and not puts_full: app.logger.info("Waiting for options tables...") tables = wait_for_tables(page) if len(tables) < 2: app.logger.error( "Only %d tables found; expected 2. HTML may have changed.", len(tables), ) return {"error": "Could not locate options tables", "stock": symbol} app.logger.info("Found %d tables. Extracting Calls & Puts.", len(tables)) calls_html = tables[0].evaluate("el => el.outerHTML") puts_html = tables[1].evaluate("el => el.outerHTML") # --- Extract current price --- try: # Primary selector price_text = page.locator( "fin-streamer[data-field='regularMarketPrice']" ).inner_text() price = float(price_text.replace(",", "")) except Exception: try: # Fallback price_text = page.locator("span[data-testid='qsp-price']").inner_text() price = float(price_text.replace(",", "")) except Exception as e: app.logger.warning("Failed to extract price for %s: %s", symbol, e) app.logger.info("Current price for %s = %s", symbol, price) finally: browser.close() if not calls_full and not puts_full and calls_html and puts_html: calls_full = parse_table(calls_html, "calls") puts_full = parse_table(puts_html, "puts") expected_code = expected_expiry_code(target_date) if expected_code: if not has_expected_expiry(calls_full, expected_code) and not has_expected_expiry( puts_full, expected_code ): return { "error": "Options chain does not match requested expiration", "stock": symbol, "requested_expiration": requested_expiration, "expected_expiration_code": expected_code, "selected_expiration": { "value": selected_expiration_value, "label": selected_expiration_label, }, } # ---------------------------------------------------------------------- # Pruning logic # ---------------------------------------------------------------------- def prune_nearest(options, price_value, limit=26, side=""): if price_value is None: return options, 0 numeric = [o for o in options if isinstance(o.get("Strike"), (int, float))] if len(numeric) <= limit: return numeric, 0 sorted_opts = sorted(numeric, key=lambda x: abs(x["Strike"] - price_value)) pruned = sorted_opts[:limit] pruned_count = len(options) - len(pruned) return pruned, pruned_count calls, pruned_calls = prune_nearest(calls_full, price, side="calls") puts, pruned_puts = prune_nearest(puts_full, price, side="puts") def strike_range(opts): strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))] return [min(strikes), max(strikes)] if strikes else [None, None] return { "stock": symbol, "url": url, "requested_expiration": requested_expiration, "selected_expiration": { "value": selected_expiration_value, "label": selected_expiration_label, }, "current_price": price, "calls": calls, "puts": puts, "calls_strike_range": strike_range(calls), "puts_strike_range": strike_range(puts), "total_calls": len(calls), "total_puts": len(puts), "pruned_calls_count": pruned_calls, "pruned_puts_count": pruned_puts, } @app.route("/scrape_sync") def scrape_sync(): symbol = request.args.get("stock", "MSFT") expiration = ( request.args.get("expiration") or request.args.get("expiry") or request.args.get("date") ) app.logger.info( "Received /scrape_sync request for symbol=%s expiration=%s", symbol, expiration, ) return jsonify(scrape_yahoo_options(symbol, expiration)) if __name__ == "__main__": app.run(host="0.0.0.0", port=9777)