143 lines
5.3 KiB
Python
143 lines
5.3 KiB
Python
import threading
|
|
from flask import Flask, jsonify, send_file
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
from bs4 import BeautifulSoup
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Global variables to store scrape status and processed data
|
|
SCRAPE_STATUS = {"done": False, "error": None}
|
|
PROCESSED_DATA = []
|
|
|
|
def run_selenium_scrape():
|
|
global SCRAPE_STATUS
|
|
global PROCESSED_DATA
|
|
SCRAPE_STATUS = {"done": False, "error": None}
|
|
PROCESSED_DATA = [] # Clear previous data
|
|
|
|
chrome_options = Options()
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--start-maximized")
|
|
# HEADFUL: do NOT use --headless
|
|
|
|
chrome_options.add_argument(
|
|
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
)
|
|
|
|
service = Service(ChromeDriverManager().install())
|
|
driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
|
|
try:
|
|
driver.get("https://finance.yahoo.com/quote/%5ESPX/options/")
|
|
|
|
# Optional: click accept on consent popup if present
|
|
try:
|
|
consent_btn = WebDriverWait(driver, 5).until(
|
|
EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'Accept')]"))
|
|
)
|
|
consent_btn.click()
|
|
except:
|
|
pass # No consent popup, ignore
|
|
|
|
# Wait for the options table
|
|
WebDriverWait(driver, 20).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, "section[data-testid='options-list-table']")
|
|
)
|
|
)
|
|
|
|
html = driver.page_source
|
|
soup = BeautifulSoup(html, "html.parser")
|
|
|
|
section = soup.find("section", {"data-testid": "options-list-table"})
|
|
if section:
|
|
# Extract headers
|
|
headers = [th.get_text(strip=True) for th in section.find('thead').find_all('th')]
|
|
|
|
# Extract rows
|
|
rows = section.find('tbody').find_all('tr')
|
|
|
|
cleaned_data = []
|
|
for row in rows:
|
|
cols = row.find_all('td')
|
|
row_data = {}
|
|
for i, col in enumerate(cols):
|
|
# Clean text, remove extra spans and strip whitespace
|
|
value = col.get_text(separator=' ', strip=True).replace('<span class="yf-wurt5d"></span>', '').strip()
|
|
|
|
# Convert to appropriate types and handle 'nil' values
|
|
if headers[i] == 'Strike' or headers[i] == 'Last Price' or headers[i] == 'Bid' or headers[i] == 'Ask' or headers[i] == 'Change':
|
|
try:
|
|
value = float(value)
|
|
except ValueError:
|
|
value = None # Set to None for empty/nil values
|
|
elif headers[i] == 'Volume' or headers[i] == 'Open Interest':
|
|
try:
|
|
value = int(value)
|
|
except ValueError:
|
|
value = None # Set to None for empty/nil values
|
|
elif value == '-' or value == '':
|
|
value = None # Explicitly handle '-' and empty strings as None
|
|
|
|
if value is not None: # Only include non-empty/non-nil values
|
|
row_data[headers[i]] = value
|
|
|
|
if row_data: # Only add row if it contains any data after cleaning
|
|
cleaned_data.append(row_data)
|
|
|
|
PROCESSED_DATA = cleaned_data
|
|
else:
|
|
PROCESSED_DATA = []
|
|
|
|
SCRAPE_STATUS = {"done": True, "error": None}
|
|
|
|
except Exception as e:
|
|
SCRAPE_STATUS = {"done": False, "error": str(e)}
|
|
|
|
finally:
|
|
driver.quit()
|
|
|
|
# Option 1: synchronous scrape - request waits for scrape to finish
|
|
@app.route('/scrape_sync', methods=['GET'])
|
|
def scrape_sync():
|
|
run_selenium_scrape()
|
|
if SCRAPE_STATUS["done"]:
|
|
return jsonify(PROCESSED_DATA)
|
|
else:
|
|
return jsonify({"error": SCRAPE_STATUS["error"]}), 500
|
|
|
|
# Option 2: threaded scrape + join - start thread, then wait for it in request
|
|
@app.route('/scrape_threaded', methods=['GET'])
|
|
def scrape_threaded():
|
|
thread = threading.Thread(target=run_selenium_scrape)
|
|
thread.start()
|
|
thread.join() # wait for scraping to finish
|
|
|
|
if SCRAPE_STATUS["done"]:
|
|
return jsonify(PROCESSED_DATA)
|
|
else:
|
|
return jsonify({"error": SCRAPE_STATUS["error"]}), 500
|
|
|
|
# Your existing endpoints to check status or get result directly
|
|
@app.route('/status', methods=['GET'])
|
|
def status():
|
|
return jsonify(SCRAPE_STATUS)
|
|
|
|
@app.route('/result', methods=['GET'])
|
|
def result():
|
|
# This endpoint can now return the processed JSON data if a scrape was successful
|
|
if SCRAPE_STATUS["done"]:
|
|
return jsonify(PROCESSED_DATA)
|
|
else:
|
|
return jsonify({"error": "No data available or scrape not yet complete. Run /scrape_sync or /scrape_threaded first."}), 404
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=8000) |