import threading from flask import Flask, jsonify, request from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager from bs4 import BeautifulSoup import urllib.parse app = Flask(__name__) SCRAPE_STATUS = {"done": False, "error": None} PROCESSED_DATA = {} EARNINGS_STATUS = {"done": False, "error": None} EARNINGS_DATA = {} def run_selenium_scrape(stock_symbol): global SCRAPE_STATUS global PROCESSED_DATA SCRAPE_STATUS = {"done": False, "error": None} PROCESSED_DATA = {} removed_rows = [] chrome_options = Options() chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--headless") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" ) service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) try: encoded_symbol = urllib.parse.quote(stock_symbol) url = f"https://finance.yahoo.com/quote/{encoded_symbol}/options/" driver.get(url) try: consent_btn = WebDriverWait(driver, 5).until( EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]")) ) consent_btn.click() except: pass WebDriverWait(driver, 20).until( EC.presence_of_element_located( (By.CSS_SELECTOR, "span[data-testid='qsp-price']") ) ) html = driver.page_source soup = BeautifulSoup(html, "html.parser") price_span = soup.find("span", {"data-testid": "qsp-price"}) if price_span: current_price = float(price_span.text.replace(",", "")) else: raise Exception("Could not find current price!") section = soup.find("section", {"data-testid": "options-list-table"}) if not section: raise Exception("Could not find options table!") headers = [th.get_text(strip=True) for th in section.find('thead').find_all('th')] rows = section.find('tbody').find_all('tr') all_options = [] for row in rows: cols = row.find_all('td') row_data = {} for i, col in enumerate(cols): value = col.get_text(separator=' ', strip=True) header = headers[i] if header in ['Strike', 'Last Price', 'Bid', 'Ask', 'Change']: try: value = float(value) except ValueError: value = None elif header in ['Volume', 'Open Interest']: try: value = int(value) except ValueError: value = None elif header == '% Chance': try: value = float(value.strip('%')) except: value = None if value == '-' or value == '': value = None if value is not None: row_data[header] = value bid = row_data.get('Bid', 0) ask = row_data.get('Ask', 0) pct_chance = row_data.get('% Chance', None) if (pct_chance == 0) or (bid == 0 and ask == 0): removed_rows.append(row_data) elif row_data: all_options.append(row_data) calls_all = sorted([opt for opt in all_options if 'C' in opt.get('Contract Name', '')], key=lambda x: x.get('Strike', 0)) puts_all = sorted([opt for opt in all_options if 'P' in opt.get('Contract Name', '')], key=lambda x: x.get('Strike', 0)) def limit_nearest(options, num, price, removed): strikes = [o['Strike'] for o in options if 'Strike' in o] if not strikes: return [] nearest_idx = min(range(len(strikes)), key=lambda i: abs(strikes[i] - price)) half = num // 2 start = max(nearest_idx - half, 0) end = min(nearest_idx + half + (num % 2), len(strikes)) kept = options[start:end] removed += options[:start] + options[end:] return kept calls_near = limit_nearest(calls_all, 16, current_price, removed_rows) puts_near = limit_nearest(puts_all, 16, current_price, removed_rows) def get_range(options): strikes = [o['Strike'] for o in options if 'Strike' in o] if not strikes: return [None, None] return [min(strikes), max(strikes)] PROCESSED_DATA = { "stock": stock_symbol, "url": url, "current_price": current_price, "calls": calls_near, "puts": puts_near, "calls_strike_range": get_range(calls_near), "puts_strike_range": get_range(puts_near), "calls_strike_range_all": get_range(calls_all), "puts_strike_range_all": get_range(puts_all), "removed_count": len(removed_rows) } SCRAPE_STATUS = {"done": True, "error": None} except Exception as e: SCRAPE_STATUS = {"done": False, "error": str(e)} finally: driver.quit() def run_earnings_scrape(): import time global EARNINGS_STATUS global EARNINGS_DATA EARNINGS_STATUS = {"done": False, "error": None} EARNINGS_DATA = {} chrome_options = Options() chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # chrome_options.add_argument("--headless") chrome_options.add_argument("--window-size=1920,1080") chrome_options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" ) print("[EARNINGS] Starting ChromeDriver...") service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=chrome_options) try: url = "https://www.nasdaq.com/market-activity/earnings" print(f"[EARNINGS] Navigating to: {url}") driver.get(url) try: consent_btn = WebDriverWait(driver, 5).until( EC.element_to_be_clickable( (By.XPATH, "//button[contains(text(),'Accept')]") ) ) consent_btn.click() print("[EARNINGS] Clicked cookie consent button.") except Exception: print("[EARNINGS] No cookie consent button found — skipping.") print("[EARNINGS] Locating element...") host = WebDriverWait(driver, 20).until( EC.presence_of_element_located( (By.CSS_SELECTOR, "nsdq-table-sort") ) ) print("[EARNINGS] Accessing shadowRoot...") rows = driver.execute_script(""" const host = arguments[0]; const shadowRoot = host.shadowRoot; if (!shadowRoot) return []; return Array.from(shadowRoot.querySelectorAll("div[part='table-row']")).map(r => r.outerHTML); """, host) print(f"[EARNINGS] Found {len(rows)} rows in shadowRoot.") earnings_list = [] for row_html in rows: # parse using BeautifulSoup from bs4 import BeautifulSoup row_soup = BeautifulSoup(row_html, "html.parser") cells = row_soup.select("div[part='table-cell']") if len(cells) >= 9: time_icon = cells[0].img['alt'] if cells[0].img else "" symbol = cells[1].get_text(strip=True) company = cells[2].get_text(strip=True) market_cap = cells[3].get_text(strip=True) fiscal_qtr = cells[4].get_text(strip=True) consensus_eps = cells[5].get_text(strip=True) num_ests = cells[6].get_text(strip=True) last_year_date = cells[7].get_text(strip=True) last_year_eps = cells[8].get_text(strip=True) earnings_list.append({ "time_icon": time_icon, "symbol": symbol, "company": company, "market_cap": market_cap, "fiscal_quarter_ending": fiscal_qtr, "consensus_eps_forecast": consensus_eps, "number_of_estimates": num_ests, "last_year_report_date": last_year_date, "last_year_eps": last_year_eps }) print(f"[EARNINGS] Parsed {len(earnings_list)} rows.") EARNINGS_DATA = { "url": url, "earnings": earnings_list } EARNINGS_STATUS = {"done": True, "error": None} except Exception as e: print(f"[EARNINGS] ERROR: {e}") ts = int(time.time()) driver.save_screenshot(f"earnings_error_{ts}.png") with open(f"earnings_error_{ts}.html", "w", encoding="utf-8") as f: f.write(driver.page_source) EARNINGS_STATUS = {"done": False, "error": str(e)} finally: driver.quit() print("[EARNINGS] Closed ChromeDriver.") @app.route('/scrape_sync', methods=['GET']) def scrape_sync(): stock = request.args.get('stock') if not stock: return jsonify({"error": "Missing 'stock' query parameter. Example: /scrape_sync?stock=%5ESPX"}), 400 run_selenium_scrape(stock) if SCRAPE_STATUS["done"]: return jsonify(PROCESSED_DATA) else: return jsonify({"error": SCRAPE_STATUS["error"]}), 500 @app.route('/scrape_earnings', methods=['GET']) def scrape_earnings(): run_earnings_scrape() if EARNINGS_STATUS["done"]: return jsonify(EARNINGS_DATA) else: return jsonify({"error": EARNINGS_STATUS["error"]}), 500 @app.route('/status', methods=['GET']) def status(): return jsonify({ "options_status": SCRAPE_STATUS, "earnings_status": EARNINGS_STATUS }) @app.route('/result', methods=['GET']) def result(): if SCRAPE_STATUS["done"]: return jsonify(PROCESSED_DATA) else: return jsonify({"error": "No data available or scrape not yet complete. Run /scrape_sync?stock= first."}), 404 import logging import time import urllib.parse from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from bs4 import BeautifulSoup from webdriver_manager.chrome import ChromeDriverManager logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) SCRAPE_STATUS_ALL_DATES = {"done": False, "error": None} def parse_options_table(html): """ Parse the options chain table HTML and return a list of option dicts. You can customize this based on your original parsing logic. """ soup = BeautifulSoup(html, "html.parser") section = soup.select_one("section[data-test='option-chain']") if not section: logger.warning("Options table section not found in HTML") return [] headers = [th.get_text(strip=True) for th in section.select('thead th')] rows = section.select('tbody tr') options_list = [] for row in rows: cols = row.find_all('td') if len(cols) != len(headers): continue # skip malformed row option_data = {} for i, col in enumerate(cols): header = headers[i] text = col.get_text(separator=' ', strip=True) # Convert numeric fields where applicable if header in ['Strike', 'Last Price', 'Bid', 'Ask', 'Change']: try: text = float(text.replace(',', '')) except: text = None elif header in ['Volume', 'Open Interest']: try: text = int(text.replace(',', '')) except: text = None elif header == '% Chance': try: text = float(text.strip('%')) except: text = None elif text in ['', '-']: text = None option_data[header] = text options_list.append(option_data) return options_list def run_selenium_scrape_per_day(stock_symbol): logger.info(f"Starting scrape for: {stock_symbol}") options = Options() # Comment this line to disable headless mode and see the browser window # options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--window-size=1920,1080") options.add_argument( "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" ) service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=options) wait = WebDriverWait(driver, 20) try: encoded_symbol = urllib.parse.quote(stock_symbol) url = f"https://finance.yahoo.com/quote/{encoded_symbol}/options/" driver.get(url) # Accept consent if present try: consent_btn = wait.until( EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]")) ) consent_btn.click() logger.info("Clicked consent accept button") except: logger.info("No consent button to click") # Wait for main price span to confirm page load wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "span[data-testid='qsp-price']"))) # Click expiration dropdown button dropdown_button = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-type='date']")) ) dropdown_button.click() logger.info("Clicked expiration date dropdown") # Get menu container id dynamically menu_id = dropdown_button.get_attribute("aria-controls") logger.info(f"Dropdown menu container ID: {menu_id}") # Wait for menu container visible wait.until( EC.visibility_of_element_located( (By.CSS_SELECTOR, f"div#{menu_id}.dialog-container:not([aria-hidden='true'])") ) ) menu_container = driver.find_element(By.ID, menu_id) # Get all date option buttons date_buttons = menu_container.find_elements(By.CSS_SELECTOR, "button[data-type='date']") logger.info(f"Found {len(date_buttons)} expiration dates") all_data = {} for index in range(len(date_buttons)): # Need to reopen dropdown after first iteration, because menu closes on selection if index > 0: dropdown_button = wait.until( EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-type='date']")) ) dropdown_button.click() wait.until( EC.visibility_of_element_located( (By.CSS_SELECTOR, f"div#{menu_id}.dialog-container:not([aria-hidden='true'])") ) ) menu_container = driver.find_element(By.ID, menu_id) date_buttons = menu_container.find_elements(By.CSS_SELECTOR, "button[data-type='date']") date_button = date_buttons[index] date_value = date_button.get_attribute("title") or date_button.text logger.info(f"Selecting expiration date: {date_value}") # Use JS click to avoid any overlay issues driver.execute_script("arguments[0].click();", date_button) # Wait for options chain section to reload wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, "section[data-test='option-chain']")) ) # Small wait to allow table content to settle time.sleep(1) html = driver.page_source options_data = parse_options_table(html) logger.info(f"Scraped {len(options_data)} options for date {date_value}") all_data[date_value] = options_data logger.info(f"Completed scraping all expiration dates for {stock_symbol}") return all_data except Exception as e: logger.error(f"Exception during scrape: {e}", exc_info=True) return {} finally: driver.quit() @app.route("/scrape_sync_all_dates") def scrape_sync_all_dates(): global SCRAPE_STATUS_ALL_DATES SCRAPE_STATUS_ALL_DATES["done"] = False stock = request.args.get("stock", "^SPX") logger.info(f"Starting scrape for: {stock}") try: result = run_selenium_scrape_per_day(stock) SCRAPE_STATUS_ALL_DATES["done"] = True return jsonify(result) except Exception as e: SCRAPE_STATUS_ALL_DATES["error"] = str(e) logger.error(e, exc_info=True) return jsonify({"error": str(e)}), 500 from flask import send_file import io import os from flask import Flask, request, jsonify, send_from_directory # ✅ FIXED import # Where to save charts locally CHART_DIR = os.path.join(os.getcwd(), "charts") os.makedirs(CHART_DIR, exist_ok=True) @app.route("/chart_screenshot", methods=["GET"]) def chart_screenshot(): stock = request.args.get("stock") interval = request.args.get("interval", "5m") chart_range = request.args.get("range", "1D") timeout = int(request.args.get("timeout", "10")) if not stock: return jsonify({"error": "Missing 'stock' query parameter"}), 400 user_data_dir = r"C:\Users\Rushabh\AppData\Local\Google\Chrome\SeleniumProfile" chrome_options = Options() chrome_options.add_argument(f"--user-data-dir={user_data_dir}") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--window-size=3840,2160") chrome_options.add_argument("--force-device-scale-factor=1") driver = webdriver.Chrome( service=Service(ChromeDriverManager().install()), options=chrome_options ) png = None try: encoded_symbol = urllib.parse.quote(stock) url = f"https://finance.yahoo.com/chart/{encoded_symbol}" logger.info(f"Navigating to: {url}") driver.get(url) # ------------------------- # RANGE TABS (example) # ------------------------- try: target_range = chart_range.upper() tab_container = WebDriverWait(driver, timeout).until( EC.presence_of_element_located( (By.CSS_SELECTOR, "div[data-testid='tabs-container']") ) ) buttons = tab_container.find_elements(By.TAG_NAME, "button") for btn in buttons: if btn.text.strip().upper() == target_range: driver.execute_script("arguments[0].click();", btn) logger.info(f"Clicked range tab: {target_range}") break except Exception as e: logger.warning(f"Failed to select chart range {chart_range}: {e}") # ------------------------- # SCREENSHOT # ------------------------- try: chart = WebDriverWait(driver, timeout).until( EC.presence_of_element_located( (By.CSS_SELECTOR, "div[data-testid='chart-container']") ) ) WebDriverWait(driver, timeout).until( lambda d: chart.size['height'] > 0 and chart.size['width'] > 0 ) png = chart.screenshot_as_png logger.info("Screenshot captured from chart container") except Exception as e: logger.warning(f"Chart container not found: {e}") png = driver.get_screenshot_as_png() logger.info("Fallback full page screenshot captured") except Exception as e: logger.exception("Unhandled exception in chart_screenshot") return jsonify({"error": str(e)}), 500 finally: driver.quit() # ------------------------- # SAVE TO FILE + RETURN URL # ------------------------- filename = f"{stock}_{interval}_{chart_range}.png".replace("^", "") out_path = os.path.join(CHART_DIR, filename) with open(out_path, "wb") as f: f.write(png) file_url = f"http://{request.host}/charts/{filename}" return jsonify({ "stock": stock, "interval": interval, "range": chart_range, "url": file_url }) # ✅ Serve files from /charts @app.route("/charts/") def serve_chart(filename): return send_from_directory(CHART_DIR, filename) if __name__ == "__main__": app.run(host="0.0.0.0", port=9777)