Compare commits

..

2 Commits

View File

@@ -10,6 +10,7 @@ import httpx
import time import time
import os import os
import subprocess import subprocess
import uuid
from urllib.parse import urlparse from urllib.parse import urlparse
from dotenv import load_dotenv from dotenv import load_dotenv
from atproto import Client, client_utils, models from atproto import Client, client_utils, models
@@ -56,6 +57,10 @@ logging.basicConfig(
level=logging.INFO, level=logging.INFO,
) )
# --- Per-run caches for efficiency ---
OG_TITLE_CACHE = {}
URL_RESOLUTION_CACHE = {}
# --- Custom Classes --- # --- Custom Classes ---
class ScrapedMedia: class ScrapedMedia:
@@ -317,6 +322,33 @@ def canonicalize_tweet_url(url):
return f"https://x.com/{handle}/status/{tweet_id}" return f"https://x.com/{handle}/status/{tweet_id}"
def extract_tweet_id(tweet_url):
if not tweet_url:
return None
match = re.search(r"/status/(\d+)", tweet_url)
if match:
return match.group(1)
return None
def make_unique_video_temp_base(tweet_url=None):
tweet_id = extract_tweet_id(tweet_url) or "unknown"
ts_ms = int(time.time() * 1000)
rand = uuid.uuid4().hex[:8]
base = f"temp_video_{tweet_id}_{ts_ms}_{rand}"
logging.info(f"🎞️ Using unique temp video base: {base}")
return base
def remove_file_quietly(path):
if path and os.path.exists(path):
try:
os.remove(path)
logging.info(f"🧹 Removed temp file: {path}")
except Exception as e:
logging.warning(f"⚠️ Could not remove temp file {path}: {e}")
def is_x_or_twitter_domain(url): def is_x_or_twitter_domain(url):
try: try:
normalized = normalize_urlish_token(url) or url normalized = normalize_urlish_token(url) or url
@@ -346,7 +378,6 @@ def extract_urls_from_text(text):
return [] return []
repaired = repair_broken_urls(text) repaired = repair_broken_urls(text)
pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+'
return re.findall(pattern, repaired) return re.findall(pattern, repaired)
@@ -372,7 +403,36 @@ def extract_quoted_text_from_og_title(og_title):
return None return None
def should_fetch_og_title(tweet):
"""
Avoid fetching og:title unless it is likely to improve the text.
"""
text = clean_post_text(tweet.text or "")
urls = extract_urls_from_text(text)
if not text:
return True
if any(is_tco_domain(normalize_urlish_token(u) or u) for u in urls):
return True
if "" in text or text.endswith("..."):
return True
if len(text) < 35:
return True
return False
def fetch_tweet_og_title_text(tweet_url): def fetch_tweet_og_title_text(tweet_url):
if not tweet_url:
return None
if tweet_url in OG_TITLE_CACHE:
logging.info(f"⚡ Using cached og:title text for {tweet_url}")
return OG_TITLE_CACHE[tweet_url]
browser = None browser = None
context = None context = None
page = None page = None
@@ -397,7 +457,7 @@ def fetch_tweet_og_title_text(tweet_url):
page.goto(tweet_url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS) page.goto(tweet_url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS)
try: try:
page.wait_for_selector('meta[property="og:title"]', timeout=10000) page.wait_for_selector('meta[property="og:title"]', timeout=7000)
except Exception: except Exception:
pass pass
@@ -405,10 +465,13 @@ def fetch_tweet_og_title_text(tweet_url):
extracted = extract_quoted_text_from_og_title(og_title) extracted = extract_quoted_text_from_og_title(og_title)
if extracted: if extracted:
extracted = clean_post_text(extracted)
OG_TITLE_CACHE[tweet_url] = extracted
logging.info(f"✅ Extracted tweet text from og:title for {tweet_url}") logging.info(f"✅ Extracted tweet text from og:title for {tweet_url}")
return clean_post_text(extracted) return extracted
logging.info(f" No usable og:title text extracted for {tweet_url}") logging.info(f" No usable og:title text extracted for {tweet_url}")
OG_TITLE_CACHE[tweet_url] = None
return None return None
except Exception as e: except Exception as e:
@@ -418,6 +481,7 @@ def fetch_tweet_og_title_text(tweet_url):
take_error_screenshot(page, "tweet_og_title_failed") take_error_screenshot(page, "tweet_og_title_failed")
except Exception: except Exception:
pass pass
OG_TITLE_CACHE[tweet_url] = None
return None return None
finally: finally:
try: try:
@@ -478,19 +542,19 @@ def resolve_tco_with_playwright(url):
except Exception as e: except Exception as e:
logging.warning(f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}") logging.warning(f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}")
time.sleep(3) time.sleep(2)
final_url = canonicalize_url(page.url) final_url = canonicalize_url(page.url)
for _ in range(6): for _ in range(4):
if final_url and is_external_non_x_url(final_url): if final_url and is_external_non_x_url(final_url):
break break
try: try:
page.wait_for_load_state("networkidle", timeout=3000) page.wait_for_load_state("networkidle", timeout=2000)
except Exception: except Exception:
pass pass
time.sleep(1) time.sleep(0.8)
final_url = canonicalize_url(page.url) final_url = canonicalize_url(page.url)
logging.info(f"🌐 Playwright final URL for {url}: {final_url}") logging.info(f"🌐 Playwright final URL for {url}: {final_url}")
@@ -523,7 +587,7 @@ def resolve_tco_with_playwright(url):
return canonicalize_url(url) return canonicalize_url(url)
def resolve_url_if_needed(url, http_client): def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True):
if not url: if not url:
return None return None
@@ -532,21 +596,34 @@ def resolve_url_if_needed(url, http_client):
if not cleaned: if not cleaned:
return None return None
if cleaned in URL_RESOLUTION_CACHE:
logging.info(f"⚡ Using cached URL resolution: {cleaned} -> {URL_RESOLUTION_CACHE[cleaned]}")
return URL_RESOLUTION_CACHE[cleaned]
if not is_tco_domain(cleaned): if not is_tco_domain(cleaned):
URL_RESOLUTION_CACHE[cleaned] = cleaned
return cleaned return cleaned
resolved_http = resolve_tco_with_httpx(cleaned, http_client) resolved_http = resolve_tco_with_httpx(cleaned, http_client)
if is_external_non_x_url(resolved_http): if is_external_non_x_url(resolved_http):
URL_RESOLUTION_CACHE[cleaned] = resolved_http
return resolved_http
if not allow_playwright_fallback:
URL_RESOLUTION_CACHE[cleaned] = resolved_http
return resolved_http return resolved_http
resolved_browser = resolve_tco_with_playwright(cleaned) resolved_browser = resolve_tco_with_playwright(cleaned)
if is_external_non_x_url(resolved_browser): if is_external_non_x_url(resolved_browser):
logging.info(f"✅ Resolved t.co via Playwright to external URL: {resolved_browser}") logging.info(f"✅ Resolved t.co via Playwright to external URL: {resolved_browser}")
URL_RESOLUTION_CACHE[cleaned] = resolved_browser
return resolved_browser return resolved_browser
if resolved_http and not is_tco_domain(resolved_http): if resolved_http and not is_tco_domain(resolved_http):
URL_RESOLUTION_CACHE[cleaned] = resolved_http
return resolved_http return resolved_http
URL_RESOLUTION_CACHE[cleaned] = cleaned
return cleaned return cleaned
@@ -591,9 +668,9 @@ def extract_first_visible_non_x_url(text):
return None return None
def extract_first_resolved_external_url(text, http_client): def extract_first_resolved_external_url(text, http_client, allow_playwright_fallback=True):
for url in extract_non_x_urls_from_text(text or ""): for url in extract_non_x_urls_from_text(text or ""):
resolved = resolve_url_if_needed(url, http_client) resolved = resolve_url_if_needed(url, http_client, allow_playwright_fallback=allow_playwright_fallback)
if not resolved: if not resolved:
continue continue
@@ -604,10 +681,12 @@ def extract_first_resolved_external_url(text, http_client):
return None return None
def sanitize_visible_urls_in_text(text, http_client): def sanitize_visible_urls_in_text(text, http_client, has_media=False):
""" """
Resolve visible t.co URLs in the text, remove x.com/twitter.com URLs from Faster logic:
visible text, normalize www. URLs, and deduplicate repeated external URLs. - remove x/twitter URLs from visible text
- resolve t.co
- if a t.co resolves to x/twitter and tweet has media, do not use Playwright fallback
""" """
if not text: if not text:
return text, None return text, None
@@ -636,9 +715,20 @@ def sanitize_visible_urls_in_text(text, http_client):
final_url = cleaned final_url = cleaned
if is_tco_domain(cleaned): if is_tco_domain(cleaned):
resolved = resolve_url_if_needed(cleaned, http_client) resolved_http_first = resolve_tco_with_httpx(cleaned, http_client)
if resolved:
final_url = resolved if is_external_non_x_url(resolved_http_first):
final_url = resolved_http_first
URL_RESOLUTION_CACHE[cleaned] = final_url
else:
if has_media and resolved_http_first and is_x_or_twitter_domain(resolved_http_first):
final_url = resolved_http_first
URL_RESOLUTION_CACHE[cleaned] = final_url
logging.info(
f"⚡ Skipping Playwright t.co fallback because tweet has media and httpx already resolved to X/Twitter URL: {final_url}"
)
else:
final_url = resolve_url_if_needed(cleaned, http_client, allow_playwright_fallback=True)
if is_x_or_twitter_domain(final_url): if is_x_or_twitter_domain(final_url):
replacements[raw_url] = "" replacements[raw_url] = ""
@@ -704,9 +794,10 @@ def sanitize_visible_urls_in_text(text, http_client):
def build_effective_tweet_text(tweet, http_client): def build_effective_tweet_text(tweet, http_client):
scraped_text = clean_post_text(tweet.text or "") scraped_text = clean_post_text(tweet.text or "")
has_media = bool(tweet.media)
og_title_text = None og_title_text = None
if tweet.tweet_url: if should_fetch_og_title(tweet):
og_title_text = fetch_tweet_og_title_text(tweet.tweet_url) og_title_text = fetch_tweet_og_title_text(tweet.tweet_url)
candidate_text = scraped_text candidate_text = scraped_text
@@ -718,11 +809,19 @@ def build_effective_tweet_text(tweet, http_client):
candidate_text = og_title_text candidate_text = og_title_text
logging.info("🧾 Using og:title-derived tweet text as primary content") logging.info("🧾 Using og:title-derived tweet text as primary content")
candidate_text, resolved_primary_external_url = sanitize_visible_urls_in_text(candidate_text, http_client) candidate_text, resolved_primary_external_url = sanitize_visible_urls_in_text(
candidate_text,
http_client,
has_media=has_media,
)
candidate_text = clean_post_text(candidate_text) candidate_text = clean_post_text(candidate_text)
if not resolved_primary_external_url: if not resolved_primary_external_url:
resolved_primary_external_url = extract_first_resolved_external_url(candidate_text, http_client) resolved_primary_external_url = extract_first_resolved_external_url(
candidate_text,
http_client,
allow_playwright_fallback=not has_media,
)
return candidate_text, resolved_primary_external_url return candidate_text, resolved_primary_external_url
@@ -1180,6 +1279,8 @@ def get_recent_bsky_posts(client, handle, limit=30):
if getattr(record, "reply", None) is not None: if getattr(record, "reply", None) is not None:
continue continue
# no-op
text = getattr(record, "text", "") or "" text = getattr(record, "text", "") or ""
normalized_text = normalize_post_text(text) normalized_text = normalize_post_text(text)
@@ -1697,7 +1798,7 @@ def scrape_tweets_via_playwright(username, password, email, target_handle):
) )
page = context.new_page() page = context.new_page()
page.goto("https://x.com/home") page.goto("https://x.com/home")
time.sleep(4) time.sleep(3)
if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url:
logging.info("✅ Session is valid!") logging.info("✅ Session is valid!")
@@ -1770,7 +1871,7 @@ def scrape_tweets_via_playwright(username, password, email, target_handle):
try: try:
page.wait_for_selector("article", timeout=20000) page.wait_for_selector("article", timeout=20000)
time.sleep(3) time.sleep(2)
articles = page.locator("article").all() articles = page.locator("article").all()
logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...") logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...")
@@ -1882,7 +1983,7 @@ def extract_video_url_from_tweet_page(context, tweet_url):
try: try:
logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}")
page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(3) time.sleep(2)
player = page.locator('[data-testid="videoPlayer"]').first player = page.locator('[data-testid="videoPlayer"]').first
@@ -1900,7 +2001,7 @@ def extract_video_url_from_tweet_page(context, tweet_url):
else: else:
logging.warning("⚠️ No video player locator found on tweet page") logging.warning("⚠️ No video player locator found on tweet page")
for _ in range(12): for _ in range(8):
if current_best(): if current_best():
break break
time.sleep(1) time.sleep(1)
@@ -1919,7 +2020,7 @@ def extract_video_url_from_tweet_page(context, tweet_url):
except Exception: except Exception:
pass pass
for _ in range(8): for _ in range(5):
if current_best(): if current_best():
break break
time.sleep(1) time.sleep(1)
@@ -2055,12 +2156,9 @@ def download_and_crop_video(video_url, output_path):
return None return None
finally: finally:
for path in [temp_input, temp_trimmed, temp_output]: remove_file_quietly(temp_input)
if os.path.exists(path): remove_file_quietly(temp_trimmed)
try: remove_file_quietly(temp_output)
os.remove(path)
except Exception:
pass
def candidate_matches_existing_bsky(candidate, recent_bsky_posts): def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
@@ -2123,7 +2221,8 @@ def sync_feeds(args):
candidate_tweets = [] candidate_tweets = []
with httpx.Client() as resolve_http_client: # --- Cheap prefilter before expensive processing ---
cheap_candidates = []
for tweet in reversed(tweets): for tweet in reversed(tweets):
try: try:
tweet_time = arrow.get(tweet.created_on) tweet_time = arrow.get(tweet.created_on)
@@ -2132,11 +2231,31 @@ def sync_feeds(args):
logging.info(f"⏭️ Skipping old tweet from {tweet_time}") logging.info(f"⏭️ Skipping old tweet from {tweet_time}")
continue continue
canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url)
if canonical_tweet_url and canonical_tweet_url in state.get("posted_tweets", {}):
logging.info(f"⚡ Early skip due to known tweet URL in local state: {canonical_tweet_url}")
continue
scraped_text = clean_post_text(tweet.text or "")
if not scraped_text and not tweet.media:
logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}")
continue
cheap_candidates.append((tweet, tweet_time, canonical_tweet_url))
except Exception as e:
logging.warning(f"⚠️ Failed during cheap prefilter: {e}")
logging.info(f"{len(cheap_candidates)} tweets remain after cheap prefilter.")
with httpx.Client() as resolve_http_client:
for tweet, tweet_time, canonical_tweet_url in cheap_candidates:
try:
full_clean_text, resolved_primary_external_url = build_effective_tweet_text(tweet, resolve_http_client) full_clean_text, resolved_primary_external_url = build_effective_tweet_text(tweet, resolve_http_client)
normalized_text = normalize_post_text(full_clean_text) normalized_text = normalize_post_text(full_clean_text)
if not normalized_text: if not normalized_text and not tweet.media:
logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}") logging.info(f"⏭️ Skipping empty/blank tweet after enrichment from {tweet_time}")
continue continue
ordered_non_x_urls = extract_ordered_non_x_urls(full_clean_text) ordered_non_x_urls = extract_ordered_non_x_urls(full_clean_text)
@@ -2169,7 +2288,7 @@ def sync_feeds(args):
media_fingerprint = build_media_fingerprint(tweet) media_fingerprint = build_media_fingerprint(tweet)
text_media_key = build_text_media_key(normalized_text, media_fingerprint) text_media_key = build_text_media_key(normalized_text, media_fingerprint)
candidate_tweets.append({ candidate = {
"tweet": tweet, "tweet": tweet,
"tweet_time": tweet_time, "tweet_time": tweet_time,
"raw_text": raw_text, "raw_text": raw_text,
@@ -2177,7 +2296,7 @@ def sync_feeds(args):
"normalized_text": normalized_text, "normalized_text": normalized_text,
"media_fingerprint": media_fingerprint, "media_fingerprint": media_fingerprint,
"text_media_key": text_media_key, "text_media_key": text_media_key,
"canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url), "canonical_tweet_url": canonical_tweet_url,
"canonical_non_x_urls": canonical_non_x_urls, "canonical_non_x_urls": canonical_non_x_urls,
"ordered_non_x_urls": ordered_non_x_urls, "ordered_non_x_urls": ordered_non_x_urls,
"primary_non_x_url": primary_non_x_url, "primary_non_x_url": primary_non_x_url,
@@ -2185,15 +2304,8 @@ def sync_feeds(args):
"looks_like_title_plus_url": looks_like_title_plus_url_post(full_clean_text), "looks_like_title_plus_url": looks_like_title_plus_url_post(full_clean_text),
"has_video": has_video, "has_video": has_video,
"has_photo": has_photo, "has_photo": has_photo,
}) }
except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}")
logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for duplicate comparison.")
tweets_to_post = []
for candidate in candidate_tweets:
is_dup_state, reason_state = candidate_matches_state(candidate, state) is_dup_state, reason_state = candidate_matches_state(candidate, state)
if is_dup_state: if is_dup_state:
logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}") logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}")
@@ -2204,11 +2316,14 @@ def sync_feeds(args):
logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}")
continue continue
tweets_to_post.append(candidate) candidate_tweets.append(candidate)
logging.info(f"📬 {len(tweets_to_post)} tweets remain after duplicate filtering.") except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}")
if not tweets_to_post: logging.info(f"📬 {len(candidate_tweets)} tweets remain after duplicate filtering.")
if not candidate_tweets:
logging.info("✅ No new tweets need posting after duplicate comparison.") logging.info("✅ No new tweets need posting after duplicate comparison.")
return return
@@ -2233,7 +2348,7 @@ def sync_feeds(args):
context = browser.new_context(**context_kwargs) context = browser.new_context(**context_kwargs)
for candidate in tweets_to_post: for candidate in candidate_tweets:
tweet = candidate["tweet"] tweet = candidate["tweet"]
tweet_time = candidate["tweet_time"] tweet_time = candidate["tweet_time"]
raw_text = candidate["raw_text"] raw_text = candidate["raw_text"]
@@ -2259,7 +2374,9 @@ def sync_feeds(args):
logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.") logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.")
media_upload_failures.append("video:no_tweet_url") media_upload_failures.append("video:no_tweet_url")
else: else:
temp_video_path = "temp_video.mp4" temp_video_base = make_unique_video_temp_base(tweet.tweet_url)
temp_video_path = f"{temp_video_base}.mp4"
try: try:
real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url)
if not real_video_url: if not real_video_url:
@@ -2280,8 +2397,10 @@ def sync_feeds(args):
if not video_embed: if not video_embed:
media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}") media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}")
finally: finally:
if os.path.exists(temp_video_path): remove_file_quietly(temp_video_path)
os.remove(temp_video_path) remove_file_quietly(f"{temp_video_base}_source.mp4")
remove_file_quietly(f"{temp_video_base}_trimmed.mp4")
remove_file_quietly(f"{temp_video_base}_compressed.mp4")
if not video_embed: if not video_embed:
logging.warning( logging.warning(