Initial commit with code

This commit is contained in:
2025-11-29 23:34:32 -08:00
commit 4b122182a4
9 changed files with 1162 additions and 0 deletions

View File

@@ -0,0 +1,608 @@
import threading
from flask import Flask, jsonify, request
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import urllib.parse
app = Flask(__name__)
SCRAPE_STATUS = {"done": False, "error": None}
PROCESSED_DATA = {}
EARNINGS_STATUS = {"done": False, "error": None}
EARNINGS_DATA = {}
def run_selenium_scrape(stock_symbol):
global SCRAPE_STATUS
global PROCESSED_DATA
SCRAPE_STATUS = {"done": False, "error": None}
PROCESSED_DATA = {}
removed_rows = []
chrome_options = Options()
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--headless")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
encoded_symbol = urllib.parse.quote(stock_symbol)
url = f"https://finance.yahoo.com/quote/{encoded_symbol}/options/"
driver.get(url)
try:
consent_btn = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]"))
)
consent_btn.click()
except:
pass
WebDriverWait(driver, 20).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "span[data-testid='qsp-price']")
)
)
html = driver.page_source
soup = BeautifulSoup(html, "html.parser")
price_span = soup.find("span", {"data-testid": "qsp-price"})
if price_span:
current_price = float(price_span.text.replace(",", ""))
else:
raise Exception("Could not find current price!")
section = soup.find("section", {"data-testid": "options-list-table"})
if not section:
raise Exception("Could not find options table!")
headers = [th.get_text(strip=True) for th in section.find('thead').find_all('th')]
rows = section.find('tbody').find_all('tr')
all_options = []
for row in rows:
cols = row.find_all('td')
row_data = {}
for i, col in enumerate(cols):
value = col.get_text(separator=' ', strip=True)
header = headers[i]
if header in ['Strike', 'Last Price', 'Bid', 'Ask', 'Change']:
try:
value = float(value)
except ValueError:
value = None
elif header in ['Volume', 'Open Interest']:
try:
value = int(value)
except ValueError:
value = None
elif header == '% Chance':
try:
value = float(value.strip('%'))
except:
value = None
if value == '-' or value == '':
value = None
if value is not None:
row_data[header] = value
bid = row_data.get('Bid', 0)
ask = row_data.get('Ask', 0)
pct_chance = row_data.get('% Chance', None)
if (pct_chance == 0) or (bid == 0 and ask == 0):
removed_rows.append(row_data)
elif row_data:
all_options.append(row_data)
calls_all = sorted([opt for opt in all_options if 'C' in opt.get('Contract Name', '')], key=lambda x: x.get('Strike', 0))
puts_all = sorted([opt for opt in all_options if 'P' in opt.get('Contract Name', '')], key=lambda x: x.get('Strike', 0))
def limit_nearest(options, num, price, removed):
strikes = [o['Strike'] for o in options if 'Strike' in o]
if not strikes:
return []
nearest_idx = min(range(len(strikes)), key=lambda i: abs(strikes[i] - price))
half = num // 2
start = max(nearest_idx - half, 0)
end = min(nearest_idx + half + (num % 2), len(strikes))
kept = options[start:end]
removed += options[:start] + options[end:]
return kept
calls_near = limit_nearest(calls_all, 16, current_price, removed_rows)
puts_near = limit_nearest(puts_all, 16, current_price, removed_rows)
def get_range(options):
strikes = [o['Strike'] for o in options if 'Strike' in o]
if not strikes:
return [None, None]
return [min(strikes), max(strikes)]
PROCESSED_DATA = {
"stock": stock_symbol,
"url": url,
"current_price": current_price,
"calls": calls_near,
"puts": puts_near,
"calls_strike_range": get_range(calls_near),
"puts_strike_range": get_range(puts_near),
"calls_strike_range_all": get_range(calls_all),
"puts_strike_range_all": get_range(puts_all),
"removed_count": len(removed_rows)
}
SCRAPE_STATUS = {"done": True, "error": None}
except Exception as e:
SCRAPE_STATUS = {"done": False, "error": str(e)}
finally:
driver.quit()
def run_earnings_scrape():
import time
global EARNINGS_STATUS
global EARNINGS_DATA
EARNINGS_STATUS = {"done": False, "error": None}
EARNINGS_DATA = {}
chrome_options = Options()
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
# chrome_options.add_argument("--headless")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
print("[EARNINGS] Starting ChromeDriver...")
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
try:
url = "https://www.nasdaq.com/market-activity/earnings"
print(f"[EARNINGS] Navigating to: {url}")
driver.get(url)
try:
consent_btn = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable(
(By.XPATH, "//button[contains(text(),'Accept')]")
)
)
consent_btn.click()
print("[EARNINGS] Clicked cookie consent button.")
except Exception:
print("[EARNINGS] No cookie consent button found — skipping.")
print("[EARNINGS] Locating <nsdq-table-sort> element...")
host = WebDriverWait(driver, 20).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "nsdq-table-sort")
)
)
print("[EARNINGS] Accessing shadowRoot...")
rows = driver.execute_script("""
const host = arguments[0];
const shadowRoot = host.shadowRoot;
if (!shadowRoot) return [];
return Array.from(shadowRoot.querySelectorAll("div[part='table-row']")).map(r => r.outerHTML);
""", host)
print(f"[EARNINGS] Found {len(rows)} rows in shadowRoot.")
earnings_list = []
for row_html in rows:
# parse using BeautifulSoup
from bs4 import BeautifulSoup
row_soup = BeautifulSoup(row_html, "html.parser")
cells = row_soup.select("div[part='table-cell']")
if len(cells) >= 9:
time_icon = cells[0].img['alt'] if cells[0].img else ""
symbol = cells[1].get_text(strip=True)
company = cells[2].get_text(strip=True)
market_cap = cells[3].get_text(strip=True)
fiscal_qtr = cells[4].get_text(strip=True)
consensus_eps = cells[5].get_text(strip=True)
num_ests = cells[6].get_text(strip=True)
last_year_date = cells[7].get_text(strip=True)
last_year_eps = cells[8].get_text(strip=True)
earnings_list.append({
"time_icon": time_icon,
"symbol": symbol,
"company": company,
"market_cap": market_cap,
"fiscal_quarter_ending": fiscal_qtr,
"consensus_eps_forecast": consensus_eps,
"number_of_estimates": num_ests,
"last_year_report_date": last_year_date,
"last_year_eps": last_year_eps
})
print(f"[EARNINGS] Parsed {len(earnings_list)} rows.")
EARNINGS_DATA = {
"url": url,
"earnings": earnings_list
}
EARNINGS_STATUS = {"done": True, "error": None}
except Exception as e:
print(f"[EARNINGS] ERROR: {e}")
ts = int(time.time())
driver.save_screenshot(f"earnings_error_{ts}.png")
with open(f"earnings_error_{ts}.html", "w", encoding="utf-8") as f:
f.write(driver.page_source)
EARNINGS_STATUS = {"done": False, "error": str(e)}
finally:
driver.quit()
print("[EARNINGS] Closed ChromeDriver.")
@app.route('/scrape_sync', methods=['GET'])
def scrape_sync():
stock = request.args.get('stock')
if not stock:
return jsonify({"error": "Missing 'stock' query parameter. Example: /scrape_sync?stock=%5ESPX"}), 400
run_selenium_scrape(stock)
if SCRAPE_STATUS["done"]:
return jsonify(PROCESSED_DATA)
else:
return jsonify({"error": SCRAPE_STATUS["error"]}), 500
@app.route('/scrape_earnings', methods=['GET'])
def scrape_earnings():
run_earnings_scrape()
if EARNINGS_STATUS["done"]:
return jsonify(EARNINGS_DATA)
else:
return jsonify({"error": EARNINGS_STATUS["error"]}), 500
@app.route('/status', methods=['GET'])
def status():
return jsonify({
"options_status": SCRAPE_STATUS,
"earnings_status": EARNINGS_STATUS
})
@app.route('/result', methods=['GET'])
def result():
if SCRAPE_STATUS["done"]:
return jsonify(PROCESSED_DATA)
else:
return jsonify({"error": "No data available or scrape not yet complete. Run /scrape_sync?stock=<SYMBOL> first."}), 404
import logging
import time
import urllib.parse
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from webdriver_manager.chrome import ChromeDriverManager
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
SCRAPE_STATUS_ALL_DATES = {"done": False, "error": None}
def parse_options_table(html):
"""
Parse the options chain table HTML and return a list of option dicts.
You can customize this based on your original parsing logic.
"""
soup = BeautifulSoup(html, "html.parser")
section = soup.select_one("section[data-test='option-chain']")
if not section:
logger.warning("Options table section not found in HTML")
return []
headers = [th.get_text(strip=True) for th in section.select('thead th')]
rows = section.select('tbody tr')
options_list = []
for row in rows:
cols = row.find_all('td')
if len(cols) != len(headers):
continue # skip malformed row
option_data = {}
for i, col in enumerate(cols):
header = headers[i]
text = col.get_text(separator=' ', strip=True)
# Convert numeric fields where applicable
if header in ['Strike', 'Last Price', 'Bid', 'Ask', 'Change']:
try:
text = float(text.replace(',', ''))
except:
text = None
elif header in ['Volume', 'Open Interest']:
try:
text = int(text.replace(',', ''))
except:
text = None
elif header == '% Chance':
try:
text = float(text.strip('%'))
except:
text = None
elif text in ['', '-']:
text = None
option_data[header] = text
options_list.append(option_data)
return options_list
def run_selenium_scrape_per_day(stock_symbol):
logger.info(f"Starting scrape for: {stock_symbol}")
options = Options()
# Comment this line to disable headless mode and see the browser window
# options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--window-size=1920,1080")
options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
wait = WebDriverWait(driver, 20)
try:
encoded_symbol = urllib.parse.quote(stock_symbol)
url = f"https://finance.yahoo.com/quote/{encoded_symbol}/options/"
driver.get(url)
# Accept consent if present
try:
consent_btn = wait.until(
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]"))
)
consent_btn.click()
logger.info("Clicked consent accept button")
except:
logger.info("No consent button to click")
# Wait for main price span to confirm page load
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='qsp-price']")))
# Click expiration dropdown button
dropdown_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-type='date']"))
)
dropdown_button.click()
logger.info("Clicked expiration date dropdown")
# Get menu container id dynamically
menu_id = dropdown_button.get_attribute("aria-controls")
logger.info(f"Dropdown menu container ID: {menu_id}")
# Wait for menu container visible
wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, f"div#{menu_id}.dialog-container:not([aria-hidden='true'])")
)
)
menu_container = driver.find_element(By.ID, menu_id)
# Get all date option buttons
date_buttons = menu_container.find_elements(By.CSS_SELECTOR, "button[data-type='date']")
logger.info(f"Found {len(date_buttons)} expiration dates")
all_data = {}
for index in range(len(date_buttons)):
# Need to reopen dropdown after first iteration, because menu closes on selection
if index > 0:
dropdown_button = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-type='date']"))
)
dropdown_button.click()
wait.until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, f"div#{menu_id}.dialog-container:not([aria-hidden='true'])")
)
)
menu_container = driver.find_element(By.ID, menu_id)
date_buttons = menu_container.find_elements(By.CSS_SELECTOR, "button[data-type='date']")
date_button = date_buttons[index]
date_value = date_button.get_attribute("title") or date_button.text
logger.info(f"Selecting expiration date: {date_value}")
# Use JS click to avoid any overlay issues
driver.execute_script("arguments[0].click();", date_button)
# Wait for options chain section to reload
wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, "section[data-test='option-chain']"))
)
# Small wait to allow table content to settle
time.sleep(1)
html = driver.page_source
options_data = parse_options_table(html)
logger.info(f"Scraped {len(options_data)} options for date {date_value}")
all_data[date_value] = options_data
logger.info(f"Completed scraping all expiration dates for {stock_symbol}")
return all_data
except Exception as e:
logger.error(f"Exception during scrape: {e}", exc_info=True)
return {}
finally:
driver.quit()
@app.route("/scrape_sync_all_dates")
def scrape_sync_all_dates():
global SCRAPE_STATUS_ALL_DATES
SCRAPE_STATUS_ALL_DATES["done"] = False
stock = request.args.get("stock", "^SPX")
logger.info(f"Starting scrape for: {stock}")
try:
result = run_selenium_scrape_per_day(stock)
SCRAPE_STATUS_ALL_DATES["done"] = True
return jsonify(result)
except Exception as e:
SCRAPE_STATUS_ALL_DATES["error"] = str(e)
logger.error(e, exc_info=True)
return jsonify({"error": str(e)}), 500
from flask import send_file
import io
import os
from flask import Flask, request, jsonify, send_from_directory # ✅ FIXED import
# Where to save charts locally
CHART_DIR = os.path.join(os.getcwd(), "charts")
os.makedirs(CHART_DIR, exist_ok=True)
@app.route("/chart_screenshot", methods=["GET"])
def chart_screenshot():
stock = request.args.get("stock")
interval = request.args.get("interval", "5m")
chart_range = request.args.get("range", "1D")
timeout = int(request.args.get("timeout", "10"))
if not stock:
return jsonify({"error": "Missing 'stock' query parameter"}), 400
user_data_dir = r"C:\Users\Rushabh\AppData\Local\Google\Chrome\SeleniumProfile"
chrome_options = Options()
chrome_options.add_argument(f"--user-data-dir={user_data_dir}")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument("--window-size=3840,2160")
chrome_options.add_argument("--force-device-scale-factor=1")
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()), options=chrome_options
)
png = None
try:
encoded_symbol = urllib.parse.quote(stock)
url = f"https://finance.yahoo.com/chart/{encoded_symbol}"
logger.info(f"Navigating to: {url}")
driver.get(url)
# -------------------------
# RANGE TABS (example)
# -------------------------
try:
target_range = chart_range.upper()
tab_container = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "div[data-testid='tabs-container']")
)
)
buttons = tab_container.find_elements(By.TAG_NAME, "button")
for btn in buttons:
if btn.text.strip().upper() == target_range:
driver.execute_script("arguments[0].click();", btn)
logger.info(f"Clicked range tab: {target_range}")
break
except Exception as e:
logger.warning(f"Failed to select chart range {chart_range}: {e}")
# -------------------------
# SCREENSHOT
# -------------------------
try:
chart = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located(
(By.CSS_SELECTOR, "div[data-testid='chart-container']")
)
)
WebDriverWait(driver, timeout).until(
lambda d: chart.size['height'] > 0 and chart.size['width'] > 0
)
png = chart.screenshot_as_png
logger.info("Screenshot captured from chart container")
except Exception as e:
logger.warning(f"Chart container not found: {e}")
png = driver.get_screenshot_as_png()
logger.info("Fallback full page screenshot captured")
except Exception as e:
logger.exception("Unhandled exception in chart_screenshot")
return jsonify({"error": str(e)}), 500
finally:
driver.quit()
# -------------------------
# SAVE TO FILE + RETURN URL
# -------------------------
filename = f"{stock}_{interval}_{chart_range}.png".replace("^", "")
out_path = os.path.join(CHART_DIR, filename)
with open(out_path, "wb") as f:
f.write(png)
file_url = f"http://{request.host}/charts/{filename}"
return jsonify({
"stock": stock,
"interval": interval,
"range": chart_range,
"url": file_url
})
# ✅ Serve files from /charts
@app.route("/charts/<path:filename>")
def serve_chart(filename):
return send_from_directory(CHART_DIR, filename)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=9777)