Files
SimpleScraper/scraper_service.py

176 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, jsonify, request
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
import urllib.parse
import logging
import time
app = Flask(__name__)
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
app.logger.setLevel(logging.INFO)
def scrape_yahoo_options(symbol):
encoded = urllib.parse.quote(symbol, safe="")
url = f"https://finance.yahoo.com/quote/{encoded}/options/"
app.logger.info("Starting scrape for symbol=%s url=%s", symbol, url)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.set_extra_http_headers({
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
)
})
page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Page loaded (domcontentloaded) for %s", symbol)
# --- FIXED: Yahoo changed all classnames. We no longer depend on them. ---
# We simply wait until at least TWO <table> tags appear.
app.logger.info("Waiting for options tables...")
# Wait for any table to exist
page.wait_for_selector("table", timeout=30000)
# Repeatedly check until 2 tables appear
for _ in range(30): # 30 × 1s = 30 seconds
tables = page.query_selector_all("table")
if len(tables) >= 2:
break
time.sleep(1)
tables = page.query_selector_all("table")
if len(tables) < 2:
app.logger.error("Only %d tables found — expected 2. HTML changed?", len(tables))
browser.close()
return {"error": "Could not locate options tables", "stock": symbol}
app.logger.info("Found %d tables. Extracting Calls & Puts.", len(tables))
calls_html = tables[0].evaluate("el => el.outerHTML")
puts_html = tables[1].evaluate("el => el.outerHTML")
# --- Extract current price ---
price = None
try:
# Primary selector
price_text = page.locator("fin-streamer[data-field='regularMarketPrice']").inner_text()
price = float(price_text.replace(",", ""))
except:
try:
# Fallback
price_text = page.locator("span[data-testid='qsp-price']").inner_text()
price = float(price_text.replace(",", ""))
except Exception as e:
app.logger.warning("Failed to extract price for %s: %s", symbol, e)
app.logger.info("Current price for %s = %s", symbol, price)
browser.close()
# ----------------------------------------------------------------------
# Parsing Table HTML
# ----------------------------------------------------------------------
def parse_table(table_html, side):
if not table_html:
app.logger.warning("No %s table HTML for %s", side, symbol)
return []
soup = BeautifulSoup(table_html, "html.parser")
headers = [th.get_text(strip=True) for th in soup.select("thead th")]
rows = soup.select("tbody tr")
parsed = []
for r in rows:
tds = r.find_all("td")
if len(tds) != len(headers):
continue
item = {}
for i, c in enumerate(tds):
key = headers[i]
val = c.get_text(" ", strip=True)
# Convert numeric fields
if key in ["Strike", "Last Price", "Bid", "Ask", "Change"]:
try:
val = float(val.replace(",", ""))
except:
val = None
elif key in ["Volume", "Open Interest"]:
try:
val = int(val.replace(",", ""))
except:
val = None
elif val in ["-", ""]:
val = None
item[key] = val
parsed.append(item)
app.logger.info("Parsed %d %s rows", len(parsed), side)
return parsed
calls_full = parse_table(calls_html, "calls")
puts_full = parse_table(puts_html, "puts")
# ----------------------------------------------------------------------
# Pruning logic
# ----------------------------------------------------------------------
def prune_nearest(options, price_value, limit=26, side=""):
if price_value is None:
return options, 0
numeric = [o for o in options if isinstance(o.get("Strike"), (int, float))]
if len(numeric) <= limit:
return numeric, 0
sorted_opts = sorted(numeric, key=lambda x: abs(x["Strike"] - price_value))
pruned = sorted_opts[:limit]
pruned_count = len(options) - len(pruned)
return pruned, pruned_count
calls, pruned_calls = prune_nearest(calls_full, price, side="calls")
puts, pruned_puts = prune_nearest(puts_full, price, side="puts")
def strike_range(opts):
strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))]
return [min(strikes), max(strikes)] if strikes else [None, None]
return {
"stock": symbol,
"url": url,
"current_price": price,
"calls": calls,
"puts": puts,
"calls_strike_range": strike_range(calls),
"puts_strike_range": strike_range(puts),
"total_calls": len(calls),
"total_puts": len(puts),
"pruned_calls_count": pruned_calls,
"pruned_puts_count": pruned_puts,
}
@app.route("/scrape_sync")
def scrape_sync():
symbol = request.args.get("stock", "MSFT")
app.logger.info("Received /scrape_sync request for symbol=%s", symbol)
return jsonify(scrape_yahoo_options(symbol))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9777)