Files
SimpleScraper/scraper_service.py

410 lines
13 KiB
Python

from flask import Flask, jsonify, request
from playwright.sync_api import sync_playwright
from bs4 import BeautifulSoup
from datetime import datetime, timezone
import urllib.parse
import logging
import re
import time
app = Flask(__name__)
# Logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
app.logger.setLevel(logging.INFO)
DATE_FORMATS = (
"%Y-%m-%d",
"%Y/%m/%d",
"%Y%m%d",
"%b %d, %Y",
"%B %d, %Y",
)
def parse_date(value):
for fmt in DATE_FORMATS:
try:
return datetime.strptime(value, fmt).date()
except ValueError:
continue
return None
def normalize_label(value):
return " ".join(value.strip().split()).lower()
def format_expiration_label(timestamp):
try:
return datetime.utcfromtimestamp(timestamp).strftime("%Y-%m-%d")
except Exception:
return str(timestamp)
def extract_expiration_dates_from_html(html):
if not html:
return []
patterns = (
r'\\"expirationDates\\":\[(.*?)\]',
r'"expirationDates":\[(.*?)\]',
)
match = None
for pattern in patterns:
match = re.search(pattern, html, re.DOTALL)
if match:
break
if not match:
return []
raw = match.group(1)
values = []
for part in raw.split(","):
part = part.strip()
if part.isdigit():
try:
values.append(int(part))
except Exception:
continue
return values
def build_expiration_options(expiration_dates):
options = []
for value in expiration_dates or []:
try:
value_int = int(value)
except Exception:
continue
label = format_expiration_label(value_int)
try:
date_value = datetime.utcfromtimestamp(value_int).date()
except Exception:
date_value = None
options.append({"value": value_int, "label": label, "date": date_value})
return sorted(options, key=lambda x: x["value"])
def resolve_expiration(expiration, options):
if not expiration:
return None, None
raw = expiration.strip()
if not raw:
return None, None
if raw.isdigit():
value = int(raw)
if options:
for opt in options:
if opt.get("value") == value:
return value, opt.get("label")
return None, None
return value, format_expiration_label(value)
requested_date = parse_date(raw)
if requested_date:
for opt in options:
if opt.get("date") == requested_date:
return opt.get("value"), opt.get("label")
return None, None
normalized = normalize_label(raw)
for opt in options:
if normalize_label(opt.get("label", "")) == normalized:
return opt.get("value"), opt.get("label")
return None, None
def wait_for_tables(page):
try:
page.wait_for_selector(
"section[data-testid='options-list-table'] table",
timeout=30000,
)
except Exception:
page.wait_for_selector("table", timeout=30000)
for _ in range(30): # 30 * 1s = 30 seconds
tables = page.query_selector_all(
"section[data-testid='options-list-table'] table"
)
if len(tables) >= 2:
return tables
tables = page.query_selector_all("table")
if len(tables) >= 2:
return tables
time.sleep(1)
return []
def scrape_yahoo_options(symbol, expiration=None):
encoded = urllib.parse.quote(symbol, safe="")
base_url = f"https://finance.yahoo.com/quote/{encoded}/options/"
requested_expiration = expiration.strip() if expiration else None
if not requested_expiration:
requested_expiration = None
url = base_url
app.logger.info(
"Starting scrape for symbol=%s expiration=%s url=%s",
symbol,
requested_expiration,
base_url,
)
calls_html = None
puts_html = None
price = None
selected_expiration_value = None
selected_expiration_label = None
expiration_options = []
target_date = None
fallback_to_base = False
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.set_extra_http_headers(
{
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36"
)
}
)
page.set_default_timeout(60000)
try:
if requested_expiration:
if requested_expiration.isdigit():
target_date = int(requested_expiration)
selected_expiration_value = target_date
selected_expiration_label = format_expiration_label(target_date)
else:
parsed_date = parse_date(requested_expiration)
if parsed_date:
target_date = int(
datetime(
parsed_date.year,
parsed_date.month,
parsed_date.day,
tzinfo=timezone.utc,
).timestamp()
)
selected_expiration_value = target_date
selected_expiration_label = format_expiration_label(target_date)
else:
fallback_to_base = True
if target_date:
url = f"{base_url}?date={target_date}"
page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Page loaded (domcontentloaded) for %s", symbol)
html = page.content()
expiration_dates = extract_expiration_dates_from_html(html)
expiration_options = build_expiration_options(expiration_dates)
if fallback_to_base:
resolved_value, resolved_label = resolve_expiration(
requested_expiration, expiration_options
)
if resolved_value is None:
return {
"error": "Requested expiration not available",
"stock": symbol,
"requested_expiration": requested_expiration,
"available_expirations": [
{"label": opt.get("label"), "value": opt.get("value")}
for opt in expiration_options
],
}
target_date = resolved_value
selected_expiration_value = resolved_value
selected_expiration_label = resolved_label or format_expiration_label(
resolved_value
)
url = f"{base_url}?date={resolved_value}"
page.goto(url, wait_until="domcontentloaded", timeout=60000)
app.logger.info("Page loaded (domcontentloaded) for %s", symbol)
html = page.content()
expiration_dates = extract_expiration_dates_from_html(html)
expiration_options = build_expiration_options(expiration_dates)
if target_date and expiration_options:
matched = None
for opt in expiration_options:
if opt.get("value") == target_date:
matched = opt
break
if not matched:
return {
"error": "Requested expiration not available",
"stock": symbol,
"requested_expiration": requested_expiration,
"available_expirations": [
{"label": opt.get("label"), "value": opt.get("value")}
for opt in expiration_options
],
}
selected_expiration_label = matched.get("label")
elif expiration_options and not target_date:
selected_expiration_value = expiration_options[0].get("value")
selected_expiration_label = expiration_options[0].get("label")
app.logger.info("Waiting for options tables...")
tables = wait_for_tables(page)
if len(tables) < 2:
app.logger.error(
"Only %d tables found; expected 2. HTML may have changed.",
len(tables),
)
return {"error": "Could not locate options tables", "stock": symbol}
app.logger.info("Found %d tables. Extracting Calls & Puts.", len(tables))
calls_html = tables[0].evaluate("el => el.outerHTML")
puts_html = tables[1].evaluate("el => el.outerHTML")
# --- Extract current price ---
try:
# Primary selector
price_text = page.locator(
"fin-streamer[data-field='regularMarketPrice']"
).inner_text()
price = float(price_text.replace(",", ""))
except Exception:
try:
# Fallback
price_text = page.locator("span[data-testid='qsp-price']").inner_text()
price = float(price_text.replace(",", ""))
except Exception as e:
app.logger.warning("Failed to extract price for %s: %s", symbol, e)
app.logger.info("Current price for %s = %s", symbol, price)
finally:
browser.close()
# ----------------------------------------------------------------------
# Parsing Table HTML
# ----------------------------------------------------------------------
def parse_table(table_html, side):
if not table_html:
app.logger.warning("No %s table HTML for %s", side, symbol)
return []
soup = BeautifulSoup(table_html, "html.parser")
headers = [th.get_text(strip=True) for th in soup.select("thead th")]
rows = soup.select("tbody tr")
parsed = []
for r in rows:
tds = r.find_all("td")
if len(tds) != len(headers):
continue
item = {}
for i, c in enumerate(tds):
key = headers[i]
val = c.get_text(" ", strip=True)
# Convert numeric fields
if key in ["Strike", "Last Price", "Bid", "Ask", "Change"]:
try:
val = float(val.replace(",", ""))
except Exception:
val = None
elif key in ["Volume", "Open Interest"]:
try:
val = int(val.replace(",", ""))
except Exception:
val = None
elif val in ["-", ""]:
val = None
item[key] = val
parsed.append(item)
app.logger.info("Parsed %d %s rows", len(parsed), side)
return parsed
calls_full = parse_table(calls_html, "calls")
puts_full = parse_table(puts_html, "puts")
# ----------------------------------------------------------------------
# Pruning logic
# ----------------------------------------------------------------------
def prune_nearest(options, price_value, limit=26, side=""):
if price_value is None:
return options, 0
numeric = [o for o in options if isinstance(o.get("Strike"), (int, float))]
if len(numeric) <= limit:
return numeric, 0
sorted_opts = sorted(numeric, key=lambda x: abs(x["Strike"] - price_value))
pruned = sorted_opts[:limit]
pruned_count = len(options) - len(pruned)
return pruned, pruned_count
calls, pruned_calls = prune_nearest(calls_full, price, side="calls")
puts, pruned_puts = prune_nearest(puts_full, price, side="puts")
def strike_range(opts):
strikes = [o["Strike"] for o in opts if isinstance(o.get("Strike"), (int, float))]
return [min(strikes), max(strikes)] if strikes else [None, None]
return {
"stock": symbol,
"url": url,
"requested_expiration": requested_expiration,
"selected_expiration": {
"value": selected_expiration_value,
"label": selected_expiration_label,
},
"current_price": price,
"calls": calls,
"puts": puts,
"calls_strike_range": strike_range(calls),
"puts_strike_range": strike_range(puts),
"total_calls": len(calls),
"total_puts": len(puts),
"pruned_calls_count": pruned_calls,
"pruned_puts_count": pruned_puts,
}
@app.route("/scrape_sync")
def scrape_sync():
symbol = request.args.get("stock", "MSFT")
expiration = (
request.args.get("expiration")
or request.args.get("expiry")
or request.args.get("date")
)
app.logger.info(
"Received /scrape_sync request for symbol=%s expiration=%s",
symbol,
expiration,
)
return jsonify(scrape_yahoo_options(symbol, expiration))
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9777)