608 lines
21 KiB
Python
608 lines
21 KiB
Python
import threading
|
|
from flask import Flask, jsonify, request
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
from bs4 import BeautifulSoup
|
|
import urllib.parse
|
|
|
|
app = Flask(__name__)
|
|
|
|
SCRAPE_STATUS = {"done": False, "error": None}
|
|
PROCESSED_DATA = {}
|
|
|
|
EARNINGS_STATUS = {"done": False, "error": None}
|
|
EARNINGS_DATA = {}
|
|
|
|
def run_selenium_scrape(stock_symbol):
|
|
global SCRAPE_STATUS
|
|
global PROCESSED_DATA
|
|
|
|
SCRAPE_STATUS = {"done": False, "error": None}
|
|
PROCESSED_DATA = {}
|
|
|
|
removed_rows = []
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--window-size=1920,1080")
|
|
chrome_options.add_argument(
|
|
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
)
|
|
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
try:
|
|
encoded_symbol = urllib.parse.quote(stock_symbol)
|
|
url = f"https://finance.yahoo.com/quote/{encoded_symbol}/options/"
|
|
driver.get(url)
|
|
|
|
try:
|
|
consent_btn = WebDriverWait(driver, 5).until(
|
|
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]"))
|
|
)
|
|
consent_btn.click()
|
|
except:
|
|
pass
|
|
|
|
WebDriverWait(driver, 20).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, "span[data-testid='qsp-price']")
|
|
)
|
|
)
|
|
|
|
html = driver.page_source
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
price_span = soup.find("span", {"data-testid": "qsp-price"})
|
|
if price_span:
|
|
current_price = float(price_span.text.replace(",", ""))
|
|
else:
|
|
raise Exception("Could not find current price!")
|
|
|
|
section = soup.find("section", {"data-testid": "options-list-table"})
|
|
if not section:
|
|
raise Exception("Could not find options table!")
|
|
|
|
headers = [th.get_text(strip=True) for th in section.find('thead').find_all('th')]
|
|
rows = section.find('tbody').find_all('tr')
|
|
|
|
all_options = []
|
|
for row in rows:
|
|
cols = row.find_all('td')
|
|
row_data = {}
|
|
for i, col in enumerate(cols):
|
|
value = col.get_text(separator=' ', strip=True)
|
|
header = headers[i]
|
|
|
|
if header in ['Strike', 'Last Price', 'Bid', 'Ask', 'Change']:
|
|
try:
|
|
value = float(value)
|
|
except ValueError:
|
|
value = None
|
|
elif header in ['Volume', 'Open Interest']:
|
|
try:
|
|
value = int(value)
|
|
except ValueError:
|
|
value = None
|
|
elif header == '% Chance':
|
|
try:
|
|
value = float(value.strip('%'))
|
|
except:
|
|
value = None
|
|
|
|
if value == '-' or value == '':
|
|
value = None
|
|
|
|
if value is not None:
|
|
row_data[header] = value
|
|
|
|
bid = row_data.get('Bid', 0)
|
|
ask = row_data.get('Ask', 0)
|
|
pct_chance = row_data.get('% Chance', None)
|
|
|
|
if (pct_chance == 0) or (bid == 0 and ask == 0):
|
|
removed_rows.append(row_data)
|
|
elif row_data:
|
|
all_options.append(row_data)
|
|
|
|
calls_all = sorted([opt for opt in all_options if 'C' in opt.get('Contract Name', '')], key=lambda x: x.get('Strike', 0))
|
|
puts_all = sorted([opt for opt in all_options if 'P' in opt.get('Contract Name', '')], key=lambda x: x.get('Strike', 0))
|
|
|
|
def limit_nearest(options, num, price, removed):
|
|
strikes = [o['Strike'] for o in options if 'Strike' in o]
|
|
if not strikes:
|
|
return []
|
|
nearest_idx = min(range(len(strikes)), key=lambda i: abs(strikes[i] - price))
|
|
half = num // 2
|
|
|
|
start = max(nearest_idx - half, 0)
|
|
end = min(nearest_idx + half + (num % 2), len(strikes))
|
|
|
|
kept = options[start:end]
|
|
removed += options[:start] + options[end:]
|
|
return kept
|
|
|
|
calls_near = limit_nearest(calls_all, 16, current_price, removed_rows)
|
|
puts_near = limit_nearest(puts_all, 16, current_price, removed_rows)
|
|
|
|
def get_range(options):
|
|
strikes = [o['Strike'] for o in options if 'Strike' in o]
|
|
if not strikes:
|
|
return [None, None]
|
|
return [min(strikes), max(strikes)]
|
|
|
|
PROCESSED_DATA = {
|
|
"stock": stock_symbol,
|
|
"url": url,
|
|
"current_price": current_price,
|
|
"calls": calls_near,
|
|
"puts": puts_near,
|
|
"calls_strike_range": get_range(calls_near),
|
|
"puts_strike_range": get_range(puts_near),
|
|
"calls_strike_range_all": get_range(calls_all),
|
|
"puts_strike_range_all": get_range(puts_all),
|
|
"removed_count": len(removed_rows)
|
|
}
|
|
|
|
SCRAPE_STATUS = {"done": True, "error": None}
|
|
|
|
except Exception as e:
|
|
SCRAPE_STATUS = {"done": False, "error": str(e)}
|
|
|
|
finally:
|
|
driver.quit()
|
|
|
|
def run_earnings_scrape():
|
|
import time
|
|
|
|
global EARNINGS_STATUS
|
|
global EARNINGS_DATA
|
|
|
|
EARNINGS_STATUS = {"done": False, "error": None}
|
|
EARNINGS_DATA = {}
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
# chrome_options.add_argument("--headless")
|
|
chrome_options.add_argument("--window-size=1920,1080")
|
|
chrome_options.add_argument(
|
|
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
)
|
|
|
|
print("[EARNINGS] Starting ChromeDriver...")
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
try:
|
|
url = "https://www.nasdaq.com/market-activity/earnings"
|
|
print(f"[EARNINGS] Navigating to: {url}")
|
|
driver.get(url)
|
|
|
|
try:
|
|
consent_btn = WebDriverWait(driver, 5).until(
|
|
EC.element_to_be_clickable(
|
|
(By.XPATH, "//button[contains(text(),'Accept')]")
|
|
)
|
|
)
|
|
consent_btn.click()
|
|
print("[EARNINGS] Clicked cookie consent button.")
|
|
except Exception:
|
|
print("[EARNINGS] No cookie consent button found — skipping.")
|
|
|
|
print("[EARNINGS] Locating <nsdq-table-sort> element...")
|
|
host = WebDriverWait(driver, 20).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, "nsdq-table-sort")
|
|
)
|
|
)
|
|
|
|
print("[EARNINGS] Accessing shadowRoot...")
|
|
rows = driver.execute_script("""
|
|
const host = arguments[0];
|
|
const shadowRoot = host.shadowRoot;
|
|
if (!shadowRoot) return [];
|
|
return Array.from(shadowRoot.querySelectorAll("div[part='table-row']")).map(r => r.outerHTML);
|
|
""", host)
|
|
|
|
print(f"[EARNINGS] Found {len(rows)} rows in shadowRoot.")
|
|
|
|
earnings_list = []
|
|
|
|
for row_html in rows:
|
|
# parse using BeautifulSoup
|
|
from bs4 import BeautifulSoup
|
|
row_soup = BeautifulSoup(row_html, "html.parser")
|
|
cells = row_soup.select("div[part='table-cell']")
|
|
if len(cells) >= 9:
|
|
time_icon = cells[0].img['alt'] if cells[0].img else ""
|
|
symbol = cells[1].get_text(strip=True)
|
|
company = cells[2].get_text(strip=True)
|
|
market_cap = cells[3].get_text(strip=True)
|
|
fiscal_qtr = cells[4].get_text(strip=True)
|
|
consensus_eps = cells[5].get_text(strip=True)
|
|
num_ests = cells[6].get_text(strip=True)
|
|
last_year_date = cells[7].get_text(strip=True)
|
|
last_year_eps = cells[8].get_text(strip=True)
|
|
|
|
earnings_list.append({
|
|
"time_icon": time_icon,
|
|
"symbol": symbol,
|
|
"company": company,
|
|
"market_cap": market_cap,
|
|
"fiscal_quarter_ending": fiscal_qtr,
|
|
"consensus_eps_forecast": consensus_eps,
|
|
"number_of_estimates": num_ests,
|
|
"last_year_report_date": last_year_date,
|
|
"last_year_eps": last_year_eps
|
|
})
|
|
|
|
print(f"[EARNINGS] Parsed {len(earnings_list)} rows.")
|
|
EARNINGS_DATA = {
|
|
"url": url,
|
|
"earnings": earnings_list
|
|
}
|
|
EARNINGS_STATUS = {"done": True, "error": None}
|
|
|
|
except Exception as e:
|
|
print(f"[EARNINGS] ERROR: {e}")
|
|
ts = int(time.time())
|
|
driver.save_screenshot(f"earnings_error_{ts}.png")
|
|
with open(f"earnings_error_{ts}.html", "w", encoding="utf-8") as f:
|
|
f.write(driver.page_source)
|
|
EARNINGS_STATUS = {"done": False, "error": str(e)}
|
|
|
|
finally:
|
|
driver.quit()
|
|
print("[EARNINGS] Closed ChromeDriver.")
|
|
|
|
|
|
|
|
@app.route('/scrape_sync', methods=['GET'])
|
|
def scrape_sync():
|
|
stock = request.args.get('stock')
|
|
if not stock:
|
|
return jsonify({"error": "Missing 'stock' query parameter. Example: /scrape_sync?stock=%5ESPX"}), 400
|
|
|
|
run_selenium_scrape(stock)
|
|
if SCRAPE_STATUS["done"]:
|
|
return jsonify(PROCESSED_DATA)
|
|
else:
|
|
return jsonify({"error": SCRAPE_STATUS["error"]}), 500
|
|
|
|
@app.route('/scrape_earnings', methods=['GET'])
|
|
def scrape_earnings():
|
|
run_earnings_scrape()
|
|
if EARNINGS_STATUS["done"]:
|
|
return jsonify(EARNINGS_DATA)
|
|
else:
|
|
return jsonify({"error": EARNINGS_STATUS["error"]}), 500
|
|
|
|
@app.route('/status', methods=['GET'])
|
|
def status():
|
|
return jsonify({
|
|
"options_status": SCRAPE_STATUS,
|
|
"earnings_status": EARNINGS_STATUS
|
|
})
|
|
|
|
@app.route('/result', methods=['GET'])
|
|
def result():
|
|
if SCRAPE_STATUS["done"]:
|
|
return jsonify(PROCESSED_DATA)
|
|
else:
|
|
return jsonify({"error": "No data available or scrape not yet complete. Run /scrape_sync?stock=<SYMBOL> first."}), 404
|
|
import logging
|
|
import time
|
|
import urllib.parse
|
|
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from bs4 import BeautifulSoup
|
|
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
SCRAPE_STATUS_ALL_DATES = {"done": False, "error": None}
|
|
|
|
|
|
def parse_options_table(html):
|
|
"""
|
|
Parse the options chain table HTML and return a list of option dicts.
|
|
You can customize this based on your original parsing logic.
|
|
"""
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
section = soup.select_one("section[data-test='option-chain']")
|
|
if not section:
|
|
logger.warning("Options table section not found in HTML")
|
|
return []
|
|
|
|
headers = [th.get_text(strip=True) for th in section.select('thead th')]
|
|
rows = section.select('tbody tr')
|
|
|
|
options_list = []
|
|
for row in rows:
|
|
cols = row.find_all('td')
|
|
if len(cols) != len(headers):
|
|
continue # skip malformed row
|
|
|
|
option_data = {}
|
|
for i, col in enumerate(cols):
|
|
header = headers[i]
|
|
text = col.get_text(separator=' ', strip=True)
|
|
# Convert numeric fields where applicable
|
|
if header in ['Strike', 'Last Price', 'Bid', 'Ask', 'Change']:
|
|
try:
|
|
text = float(text.replace(',', ''))
|
|
except:
|
|
text = None
|
|
elif header in ['Volume', 'Open Interest']:
|
|
try:
|
|
text = int(text.replace(',', ''))
|
|
except:
|
|
text = None
|
|
elif header == '% Chance':
|
|
try:
|
|
text = float(text.strip('%'))
|
|
except:
|
|
text = None
|
|
elif text in ['', '-']:
|
|
text = None
|
|
|
|
option_data[header] = text
|
|
options_list.append(option_data)
|
|
return options_list
|
|
|
|
|
|
def run_selenium_scrape_per_day(stock_symbol):
|
|
logger.info(f"Starting scrape for: {stock_symbol}")
|
|
|
|
options = Options()
|
|
# Comment this line to disable headless mode and see the browser window
|
|
# options.add_argument("--headless")
|
|
options.add_argument("--no-sandbox")
|
|
options.add_argument("--disable-dev-shm-usage")
|
|
options.add_argument("--window-size=1920,1080")
|
|
options.add_argument(
|
|
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
|
|
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
)
|
|
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=options)
|
|
wait = WebDriverWait(driver, 20)
|
|
|
|
try:
|
|
encoded_symbol = urllib.parse.quote(stock_symbol)
|
|
url = f"https://finance.yahoo.com/quote/{encoded_symbol}/options/"
|
|
driver.get(url)
|
|
|
|
# Accept consent if present
|
|
try:
|
|
consent_btn = wait.until(
|
|
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]"))
|
|
)
|
|
consent_btn.click()
|
|
logger.info("Clicked consent accept button")
|
|
except:
|
|
logger.info("No consent button to click")
|
|
|
|
# Wait for main price span to confirm page load
|
|
wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='qsp-price']")))
|
|
|
|
# Click expiration dropdown button
|
|
dropdown_button = wait.until(
|
|
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-type='date']"))
|
|
)
|
|
dropdown_button.click()
|
|
logger.info("Clicked expiration date dropdown")
|
|
|
|
# Get menu container id dynamically
|
|
menu_id = dropdown_button.get_attribute("aria-controls")
|
|
logger.info(f"Dropdown menu container ID: {menu_id}")
|
|
|
|
# Wait for menu container visible
|
|
wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.CSS_SELECTOR, f"div#{menu_id}.dialog-container:not([aria-hidden='true'])")
|
|
)
|
|
)
|
|
menu_container = driver.find_element(By.ID, menu_id)
|
|
|
|
# Get all date option buttons
|
|
date_buttons = menu_container.find_elements(By.CSS_SELECTOR, "button[data-type='date']")
|
|
logger.info(f"Found {len(date_buttons)} expiration dates")
|
|
|
|
all_data = {}
|
|
|
|
for index in range(len(date_buttons)):
|
|
# Need to reopen dropdown after first iteration, because menu closes on selection
|
|
if index > 0:
|
|
dropdown_button = wait.until(
|
|
EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-type='date']"))
|
|
)
|
|
dropdown_button.click()
|
|
wait.until(
|
|
EC.visibility_of_element_located(
|
|
(By.CSS_SELECTOR, f"div#{menu_id}.dialog-container:not([aria-hidden='true'])")
|
|
)
|
|
)
|
|
menu_container = driver.find_element(By.ID, menu_id)
|
|
date_buttons = menu_container.find_elements(By.CSS_SELECTOR, "button[data-type='date']")
|
|
|
|
date_button = date_buttons[index]
|
|
date_value = date_button.get_attribute("title") or date_button.text
|
|
logger.info(f"Selecting expiration date: {date_value}")
|
|
|
|
# Use JS click to avoid any overlay issues
|
|
driver.execute_script("arguments[0].click();", date_button)
|
|
|
|
# Wait for options chain section to reload
|
|
wait.until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "section[data-test='option-chain']"))
|
|
)
|
|
|
|
# Small wait to allow table content to settle
|
|
time.sleep(1)
|
|
|
|
html = driver.page_source
|
|
options_data = parse_options_table(html)
|
|
logger.info(f"Scraped {len(options_data)} options for date {date_value}")
|
|
|
|
all_data[date_value] = options_data
|
|
|
|
logger.info(f"Completed scraping all expiration dates for {stock_symbol}")
|
|
return all_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"Exception during scrape: {e}", exc_info=True)
|
|
return {}
|
|
finally:
|
|
driver.quit()
|
|
|
|
@app.route("/scrape_sync_all_dates")
|
|
def scrape_sync_all_dates():
|
|
global SCRAPE_STATUS_ALL_DATES
|
|
SCRAPE_STATUS_ALL_DATES["done"] = False
|
|
stock = request.args.get("stock", "^SPX")
|
|
logger.info(f"Starting scrape for: {stock}")
|
|
try:
|
|
result = run_selenium_scrape_per_day(stock)
|
|
SCRAPE_STATUS_ALL_DATES["done"] = True
|
|
return jsonify(result)
|
|
except Exception as e:
|
|
SCRAPE_STATUS_ALL_DATES["error"] = str(e)
|
|
logger.error(e, exc_info=True)
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
from flask import send_file
|
|
import io
|
|
import os
|
|
from flask import Flask, request, jsonify, send_from_directory # ✅ FIXED import
|
|
|
|
# Where to save charts locally
|
|
CHART_DIR = os.path.join(os.getcwd(), "charts")
|
|
os.makedirs(CHART_DIR, exist_ok=True)
|
|
|
|
|
|
@app.route("/chart_screenshot", methods=["GET"])
|
|
def chart_screenshot():
|
|
stock = request.args.get("stock")
|
|
interval = request.args.get("interval", "5m")
|
|
chart_range = request.args.get("range", "1D")
|
|
timeout = int(request.args.get("timeout", "10"))
|
|
|
|
if not stock:
|
|
return jsonify({"error": "Missing 'stock' query parameter"}), 400
|
|
|
|
user_data_dir = r"C:\Users\Rushabh\AppData\Local\Google\Chrome\SeleniumProfile"
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument(f"--user-data-dir={user_data_dir}")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|
chrome_options.add_argument("--window-size=3840,2160")
|
|
chrome_options.add_argument("--force-device-scale-factor=1")
|
|
|
|
driver = webdriver.Chrome(
|
|
service=Service(ChromeDriverManager().install()), options=chrome_options
|
|
)
|
|
|
|
png = None
|
|
try:
|
|
encoded_symbol = urllib.parse.quote(stock)
|
|
url = f"https://finance.yahoo.com/chart/{encoded_symbol}"
|
|
logger.info(f"Navigating to: {url}")
|
|
driver.get(url)
|
|
|
|
# -------------------------
|
|
# RANGE TABS (example)
|
|
# -------------------------
|
|
try:
|
|
target_range = chart_range.upper()
|
|
tab_container = WebDriverWait(driver, timeout).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, "div[data-testid='tabs-container']")
|
|
)
|
|
)
|
|
buttons = tab_container.find_elements(By.TAG_NAME, "button")
|
|
for btn in buttons:
|
|
if btn.text.strip().upper() == target_range:
|
|
driver.execute_script("arguments[0].click();", btn)
|
|
logger.info(f"Clicked range tab: {target_range}")
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Failed to select chart range {chart_range}: {e}")
|
|
|
|
# -------------------------
|
|
# SCREENSHOT
|
|
# -------------------------
|
|
try:
|
|
chart = WebDriverWait(driver, timeout).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, "div[data-testid='chart-container']")
|
|
)
|
|
)
|
|
WebDriverWait(driver, timeout).until(
|
|
lambda d: chart.size['height'] > 0 and chart.size['width'] > 0
|
|
)
|
|
png = chart.screenshot_as_png
|
|
logger.info("Screenshot captured from chart container")
|
|
except Exception as e:
|
|
logger.warning(f"Chart container not found: {e}")
|
|
png = driver.get_screenshot_as_png()
|
|
logger.info("Fallback full page screenshot captured")
|
|
|
|
except Exception as e:
|
|
logger.exception("Unhandled exception in chart_screenshot")
|
|
return jsonify({"error": str(e)}), 500
|
|
finally:
|
|
driver.quit()
|
|
|
|
# -------------------------
|
|
# SAVE TO FILE + RETURN URL
|
|
# -------------------------
|
|
filename = f"{stock}_{interval}_{chart_range}.png".replace("^", "")
|
|
out_path = os.path.join(CHART_DIR, filename)
|
|
|
|
with open(out_path, "wb") as f:
|
|
f.write(png)
|
|
|
|
file_url = f"http://{request.host}/charts/{filename}"
|
|
|
|
return jsonify({
|
|
"stock": stock,
|
|
"interval": interval,
|
|
"range": chart_range,
|
|
"url": file_url
|
|
})
|
|
|
|
|
|
# ✅ Serve files from /charts
|
|
@app.route("/charts/<path:filename>")
|
|
def serve_chart(filename):
|
|
return send_from_directory(CHART_DIR, filename)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=9777) |