Feature: Merge Truth Social scraper logic into SimpleScraper
This commit is contained in:
@@ -1308,5 +1308,163 @@ def profile():
|
||||
return jsonify(scrape_yahoo_profile(symbol))
|
||||
|
||||
|
||||
def scrape_truths_sync(count=10, handle="realDonaldTrump"):
|
||||
app.logger.info("Starting Truth Social scrape for handle=%s count=%d", handle, count)
|
||||
|
||||
with sync_playwright() as p:
|
||||
launch_args = chromium_launch_args()
|
||||
if launch_args:
|
||||
app.logger.info("GPU acceleration enabled for Truth Social")
|
||||
|
||||
browser = p.chromium.launch(headless=True, args=launch_args)
|
||||
context = browser.new_context(
|
||||
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
|
||||
viewport={'width': 1920, 'height': 2000}
|
||||
)
|
||||
page = context.new_page()
|
||||
|
||||
try:
|
||||
url = f"https://truthsocial.com/@{handle}"
|
||||
app.logger.info("Navigating to %s", url)
|
||||
page.goto(url, wait_until="domcontentloaded", timeout=60000)
|
||||
|
||||
# Wait for content to load
|
||||
page.wait_for_timeout(5000)
|
||||
|
||||
# Scale down to fit more content
|
||||
page.evaluate("document.body.style.zoom = '0.7'")
|
||||
page.wait_for_timeout(2000)
|
||||
|
||||
# Handle potential modal/ad overlay
|
||||
try:
|
||||
close_btn = page.query_selector('[data-testid="close-modal"]')
|
||||
if close_btn:
|
||||
close_btn.click()
|
||||
page.wait_for_timeout(1000)
|
||||
page.keyboard.press("Escape")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Wait for any status to appear
|
||||
selector = '[data-testid="status"]'
|
||||
try:
|
||||
page.wait_for_selector(selector, timeout=20000)
|
||||
except Exception:
|
||||
selector = '[data-id]'
|
||||
page.wait_for_selector(selector, timeout=10000)
|
||||
|
||||
truths_data = []
|
||||
seen_ids = set()
|
||||
|
||||
# Since virtual lists only render what is near the scroll position,
|
||||
# we need a few small scrolls even with a tall viewport.
|
||||
for scroll_step in range(10):
|
||||
# Get all current statuses
|
||||
statuses = page.query_selector_all(selector)
|
||||
|
||||
for status in statuses:
|
||||
if len(truths_data) >= count:
|
||||
break
|
||||
|
||||
try:
|
||||
# Find post ID and validate it belongs to the target handle
|
||||
links = status.query_selector_all('a')
|
||||
post_id = None
|
||||
for link in links:
|
||||
href = link.get_attribute('href')
|
||||
if href and f"/@{handle}/posts/" in href:
|
||||
post_id = href
|
||||
break
|
||||
|
||||
if not post_id or post_id in seen_ids:
|
||||
continue
|
||||
|
||||
seen_ids.add(post_id)
|
||||
|
||||
# Content
|
||||
content_el = status.query_selector('[data-testid="status-content"]')
|
||||
if not content_el:
|
||||
content_el = status.query_selector('[data-testid="markup"]')
|
||||
content_text = content_el.inner_text() if content_el else ""
|
||||
|
||||
# Time
|
||||
time_el = status.query_selector('time')
|
||||
time_text = time_el.get_attribute('title') if time_el else ""
|
||||
if not time_text and time_el:
|
||||
time_text = time_el.inner_text()
|
||||
|
||||
# Counts
|
||||
def get_btn_text(btn_selector):
|
||||
btn = status.query_selector(btn_selector)
|
||||
return btn.inner_text() if btn else "0"
|
||||
|
||||
reply_count = get_btn_text('button[aria-label="Reply"]')
|
||||
retruth_count = get_btn_text('button[aria-label="ReTruth"]')
|
||||
like_count = get_btn_text('button[aria-label="Like"]')
|
||||
|
||||
# Media
|
||||
media_urls = []
|
||||
imgs = status.query_selector_all('img')
|
||||
for img in imgs:
|
||||
alt = img.get_attribute('alt')
|
||||
if alt in ["Avatar", "Profile header", "Logo", "Verified Account"]:
|
||||
continue
|
||||
src = img.get_attribute('src')
|
||||
if src and ("static-assets" in src or "proxy" in src):
|
||||
media_urls.append(src)
|
||||
|
||||
videos = status.query_selector_all('video')
|
||||
for video in videos:
|
||||
src = video.get_attribute('src')
|
||||
if not src:
|
||||
source_tag = video.query_selector('source')
|
||||
if source_tag: src = source_tag.get_attribute('src')
|
||||
if src: media_urls.append(src)
|
||||
|
||||
def clean(c):
|
||||
return str(c).strip().replace('\n', '')
|
||||
|
||||
truths_data.append({
|
||||
"id": post_id,
|
||||
"content": content_text,
|
||||
"time": time_text,
|
||||
"likes_count": clean(like_count),
|
||||
"comments_count": clean(reply_count),
|
||||
"retruths_count": clean(retruth_count),
|
||||
"media": list(set(media_urls))
|
||||
})
|
||||
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if len(truths_data) >= count:
|
||||
break
|
||||
|
||||
# Scroll a bit to trigger next items
|
||||
page.evaluate("window.scrollBy(0, 500)")
|
||||
page.wait_for_timeout(1000)
|
||||
|
||||
app.logger.info("Scraped %d truths", len(truths_data))
|
||||
return truths_data[:count]
|
||||
|
||||
except Exception as e:
|
||||
app.logger.error("Truths scraper error: %s", e)
|
||||
return {"error": str(e)}
|
||||
finally:
|
||||
browser.close()
|
||||
|
||||
|
||||
@app.route("/truths")
|
||||
def truths():
|
||||
try:
|
||||
count = int(request.args.get("count", 10))
|
||||
except ValueError:
|
||||
count = 10
|
||||
|
||||
handle = request.args.get("handle", "realDonaldTrump")
|
||||
app.logger.info("Received /truths request for handle=%s count=%d", handle, count)
|
||||
return jsonify(scrape_truths_sync(count, handle))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app.run(host="0.0.0.0", port=9777)
|
||||
|
||||
Reference in New Issue
Block a user