176 lines
5.9 KiB
Python
176 lines
5.9 KiB
Python
from flask import Flask, jsonify, request
|
||
from playwright.sync_api import sync_playwright
|
||
from bs4 import BeautifulSoup
|
||
import urllib.parse
|
||
import logging
|
||
import time
|
||
|
||
app = Flask(__name__)
|
||
|
||
# Logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s [%(levelname)s] %(message)s"
|
||
)
|
||
app.logger.setLevel(logging.INFO)
|
||
|
||
|
||
def scrape_yahoo_options(symbol):
|
||
encoded = urllib.parse.quote(symbol, safe="")
|
||
url = f"https://finance.yahoo.com/quote/{encoded}/options/"
|
||
|
||
app.logger.info("Starting scrape for symbol=%s url=%s", symbol, url)
|
||
|
||
with sync_playwright() as p:
|
||
browser = p.chromium.launch(headless=True)
|
||
page = browser.new_page()
|
||
page.set_extra_http_headers({
|
||
"User-Agent": (
|
||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
||
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
|
||
)
|
||
})
|
||
|
||
page.goto(url, wait_until="domcontentloaded", timeout=60000)
|
||
app.logger.info("Page loaded (domcontentloaded) for %s", symbol)
|
||
|
||
# --- FIXED: Yahoo changed all classnames. We no longer depend on them. ---
|
||
# We simply wait until at least TWO <table> tags appear.
|
||
app.logger.info("Waiting for options tables...")
|
||
|
||
# Wait for any table to exist
|
||
page.wait_for_selector("table", timeout=30000)
|
||
|
||
# Repeatedly check until 2 tables appear
|
||
for _ in range(30): # 30 × 1s = 30 seconds
|
||
tables = page.query_selector_all("table")
|
||
if len(tables) >= 2:
|
||
break
|
||
time.sleep(1)
|
||
|
||
tables = page.query_selector_all("table")
|
||
if len(tables) < 2:
|
||
app.logger.error("Only %d tables found — expected 2. HTML changed?", len(tables))
|
||
browser.close()
|
||
return {"error": "Could not locate options tables", "stock": symbol}
|
||
|
||
app.logger.info("Found %d tables. Extracting Calls & Puts.", len(tables))
|
||
|
||
calls_html = tables[0].evaluate("el => el.outerHTML")
|
||
puts_html = tables[1].evaluate("el => el.outerHTML")
|
||
|
||
# --- Extract current price ---
|
||
price = None
|
||
try:
|
||
# Primary selector
|
||
price_text = page.locator("fin-streamer[data-field='regularMarketPrice']").inner_text()
|
||
price = float(price_text.replace(",", ""))
|
||
except:
|
||
try:
|
||
# Fallback
|
||
price_text = page.locator("span[data-testid='qsp-price']").inner_text()
|
||
price = float(price_text.replace(",", ""))
|
||
except Exception as e:
|
||
app.logger.warning("Failed to extract price for %s: %s", symbol, e)
|
||
|
||
app.logger.info("Current price for %s = %s", symbol, price)
|
||
|
||
browser.close()
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Parsing Table HTML
|
||
# ----------------------------------------------------------------------
|
||
def parse_table(table_html, side):
|
||
if not table_html:
|
||
app.logger.warning("No %s table HTML for %s", side, symbol)
|
||
return []
|
||
|
||
soup = BeautifulSoup(table_html, "html.parser")
|
||
|
||
headers = [th.get_text(strip=True) for th in soup.select("thead th")]
|
||
rows = soup.select("tbody tr")
|
||
|
||
parsed = []
|
||
for r in rows:
|
||
tds = r.find_all("td")
|
||
if len(tds) != len(headers):
|
||
continue
|
||
|
||
item = {}
|
||
for i, c in enumerate(tds):
|
||
key = headers[i]
|
||
val = c.get_text(" ", strip=True)
|
||
|
||
# Convert numeric fields
|
||
if key in ["Strike", "Last Price", "Bid", "Ask", "Change"]:
|
||
try:
|
||
val = float(val.replace(",", ""))
|
||
except:
|
||
val = None
|
||
elif key in ["Volume", "Open Interest"]:
|
||
try:
|
||
val = int(val.replace(",", ""))
|
||
except:
|
||
val = None
|
||
elif val in ["-", ""]:
|
||
val = None
|
||
|
||
item[key] = val
|
||
|
||
parsed.append(item)
|
||
|
||
app.logger.info("Parsed %d %s rows", len(parsed), side)
|
||
return parsed
|
||
|
||
calls_full = parse_table(calls_html, "calls")
|
||
puts_full = parse_table(puts_html, "puts")
|
||
|
||
# ----------------------------------------------------------------------
|
||
# Pruning logic
|
||
# ----------------------------------------------------------------------
|
||
def prune_nearest(options, price_value, limit=26, side=""):
|
||
if price_value is None:
|
||
return options, 0
|
||
|
||
numeric = [o for o in options if isinstance(o.get("Strike"), (int, float))]
|
||
|
||
if len(numeric) <= limit:
|
||
return numeric, 0
|
||
|
||
sorted_opts = sorted(numeric, key=lambda x: abs(x["Strike"] - price_value))
|
||
pruned = sorted_opts[:limit]
|
||
pruned_count = len(options) - len(pruned)
|
||
return pruned, pruned_count
|
||
|
||
calls, pruned_calls = prune_nearest(calls_full, price, side="calls")
|
||
puts, pruned_puts = prune_nearest(puts_full, price, side="puts")
|
||
|
||
def strike_range(opts):
|
||
strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))]
|
||
return [min(strikes), max(strikes)] if strikes else [None, None]
|
||
|
||
return {
|
||
"stock": symbol,
|
||
"url": url,
|
||
"current_price": price,
|
||
"calls": calls,
|
||
"puts": puts,
|
||
"calls_strike_range": strike_range(calls),
|
||
"puts_strike_range": strike_range(puts),
|
||
"total_calls": len(calls),
|
||
"total_puts": len(puts),
|
||
"pruned_calls_count": pruned_calls,
|
||
"pruned_puts_count": pruned_puts,
|
||
}
|
||
|
||
|
||
@app.route("/scrape_sync")
|
||
def scrape_sync():
|
||
symbol = request.args.get("stock", "MSFT")
|
||
app.logger.info("Received /scrape_sync request for symbol=%s", symbol)
|
||
return jsonify(scrape_yahoo_options(symbol))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
app.run(host="0.0.0.0", port=9777)
|