diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index eefc880..0d2dc9b 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -1,3 +1,8 @@ + + +Here's the complete single-file codebase with every fix applied: + +```python import argparse import arrow import hashlib @@ -147,14 +152,14 @@ def repair_broken_urls(text): r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", r"\1\2", text, - flags=re.IGNORECASE + flags=re.IGNORECASE, ) text = re.sub( r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", r"\1\2", text, - flags=re.IGNORECASE + flags=re.IGNORECASE, ) text = split_concatenated_urls(text) @@ -287,12 +292,53 @@ def remove_trailing_ellipsis_line(text): return "\n".join(lines).strip() +def remove_orphaned_digit_lines_before_hashtags(text): + """ + Remove lines that contain only a number (e.g. '5') when they appear + immediately before a line starting with a hashtag. These are typically + scraped UI artifacts (image counts, engagement badges, etc.). + """ + if not text: + return text + + lines = text.splitlines() + if len(lines) < 2: + return text + + result = [] + changed = False + i = 0 + + while i < len(lines): + stripped = lines[i].strip() + + if ( + stripped + and re.fullmatch(r"\d{1,3}", stripped) + and i + 1 < len(lines) + and lines[i + 1].strip().startswith("#") + ): + logging.info(f"πŸ”§ Removing orphaned digit line '{stripped}' before hashtag line") + changed = True + i += 1 + continue + + result.append(lines[i]) + i += 1 + + if changed: + return "\n".join(result) + + return text + + def clean_post_text(text): raw_text = (text or "").strip() raw_text = repair_broken_urls(raw_text) raw_text = repair_broken_mentions(raw_text) raw_text = strip_line_edge_whitespace(raw_text) raw_text = remove_trailing_ellipsis_line(raw_text) + raw_text = remove_orphaned_digit_lines_before_hashtags(raw_text) return raw_text.strip() @@ -334,7 +380,11 @@ def canonicalize_tweet_url(url): return None url = url.strip() - match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE) + match = re.search( + r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", + url, + re.IGNORECASE, + ) if not match: return url.lower() @@ -374,7 +424,13 @@ def is_x_or_twitter_domain(url): try: normalized = normalize_urlish_token(url) or url hostname = (urlparse(normalized).hostname or "").lower() - return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} + return hostname in { + "x.com", + "www.x.com", + "twitter.com", + "www.twitter.com", + "mobile.twitter.com", + } except Exception: return False @@ -417,7 +473,7 @@ def extract_quoted_text_from_og_title(og_title): first_quote = decoded.find('"') last_quote = decoded.rfind('"') if 0 <= first_quote < last_quote: - extracted = decoded[first_quote + 1:last_quote].strip() + extracted = decoded[first_quote + 1 : last_quote].strip() if extracted: return extracted @@ -461,7 +517,7 @@ def fetch_tweet_og_title_text(tweet_url): with sync_playwright() as p: browser = p.chromium.launch( headless=True, - args=["--disable-blink-features=AutomationControlled"] + args=["--disable-blink-features=AutomationControlled"], ) context = browser.new_context( user_agent=( @@ -469,17 +525,24 @@ def fetch_tweet_og_title_text(tweet_url): "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ), - viewport={"width": 1280, "height": 900} + viewport={"width": 1280, "height": 900}, ) page = context.new_page() - page.goto(tweet_url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS) + page.goto( + tweet_url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, + ) try: page.wait_for_selector('meta[property="og:title"]', timeout=7000) except Exception: pass - og_title = page.locator('meta[property="og:title"]').first.get_attribute("content") + og_title = ( + page.locator('meta[property="og:title"]') + .first.get_attribute("content") + ) extracted = extract_quoted_text_from_og_title(og_title) if extracted: @@ -493,7 +556,9 @@ def fetch_tweet_og_title_text(tweet_url): return None except Exception as e: - logging.warning(f"⚠️ Could not extract og:title text from {tweet_url}: {repr(e)}") + logging.warning( + f"⚠️ Could not extract og:title text from {tweet_url}: {repr(e)}" + ) try: if page: take_error_screenshot(page, "tweet_og_title_failed") @@ -521,7 +586,9 @@ def fetch_tweet_og_title_text(tweet_url): def resolve_tco_with_httpx(url, http_client): try: - response = http_client.get(url, timeout=URL_RESOLVE_TIMEOUT, follow_redirects=True) + response = http_client.get( + url, timeout=URL_RESOLVE_TIMEOUT, follow_redirects=True + ) final_url = canonicalize_url(str(response.url)) if final_url: logging.info(f"πŸ”— Resolved t.co with httpx: {url} -> {final_url}") @@ -543,7 +610,7 @@ def resolve_tco_with_playwright(url): with sync_playwright() as p: browser = p.chromium.launch( headless=True, - args=["--disable-blink-features=AutomationControlled"] + args=["--disable-blink-features=AutomationControlled"], ) context = browser.new_context( user_agent=( @@ -551,14 +618,20 @@ def resolve_tco_with_playwright(url): "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ), - viewport={"width": 1280, "height": 900} + viewport={"width": 1280, "height": 900}, ) page = context.new_page() try: - page.goto(url, wait_until="domcontentloaded", timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS) + page.goto( + url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, + ) except Exception as e: - logging.warning(f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}") + logging.warning( + f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}" + ) time.sleep(2) final_url = canonicalize_url(page.url) @@ -579,7 +652,9 @@ def resolve_tco_with_playwright(url): return final_url except Exception as e: - logging.warning(f"⚠️ Playwright t.co resolution failed for {url}: {repr(e)}") + logging.warning( + f"⚠️ Playwright t.co resolution failed for {url}: {repr(e)}" + ) try: if page: take_error_screenshot(page, "tco_resolve_failed") @@ -615,7 +690,9 @@ def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True): return None if cleaned in URL_RESOLUTION_CACHE: - logging.info(f"⚑ Using cached URL resolution: {cleaned} -> {URL_RESOLUTION_CACHE[cleaned]}") + logging.info( + f"⚑ Using cached URL resolution: {cleaned} -> {URL_RESOLUTION_CACHE[cleaned]}" + ) return URL_RESOLUTION_CACHE[cleaned] if not is_tco_domain(cleaned): @@ -633,7 +710,9 @@ def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True): resolved_browser = resolve_tco_with_playwright(cleaned) if is_external_non_x_url(resolved_browser): - logging.info(f"βœ… Resolved t.co via Playwright to external URL: {resolved_browser}") + logging.info( + f"βœ… Resolved t.co via Playwright to external URL: {resolved_browser}" + ) URL_RESOLUTION_CACHE[cleaned] = resolved_browser return resolved_browser @@ -686,9 +765,15 @@ def extract_first_visible_non_x_url(text): return None -def extract_first_resolved_external_url(text, http_client, allow_playwright_fallback=True): +def extract_first_resolved_external_url( + text, http_client, allow_playwright_fallback=True +): for url in extract_non_x_urls_from_text(text or ""): - resolved = resolve_url_if_needed(url, http_client, allow_playwright_fallback=allow_playwright_fallback) + resolved = resolve_url_if_needed( + url, + http_client, + allow_playwright_fallback=allow_playwright_fallback, + ) if not resolved: continue @@ -716,24 +801,23 @@ def resolve_card_url(card_url, http_client): return cleaned if is_tco_domain(cleaned): - resolved = resolve_url_if_needed(cleaned, http_client, allow_playwright_fallback=True) + resolved = resolve_url_if_needed( + cleaned, http_client, allow_playwright_fallback=True + ) if resolved and is_external_non_x_url(resolved): logging.info(f"πŸ”— Resolved card t.co URL: {cleaned} -> {resolved}") return resolved if is_x_or_twitter_domain(cleaned): - logging.info(f"ℹ️ Card URL resolves to X/Twitter domain, ignoring: {cleaned}") + logging.info( + f"ℹ️ Card URL resolves to X/Twitter domain, ignoring: {cleaned}" + ) return None return cleaned def sanitize_visible_urls_in_text(text, http_client, has_media=False): - """ - - remove x/twitter URLs from visible text - - resolve t.co - - if a t.co resolves to x/twitter and tweet has media, skip Playwright fallback - """ if not text: return text, None @@ -756,7 +840,9 @@ def sanitize_visible_urls_in_text(text, http_client, has_media=False): if is_x_or_twitter_domain(cleaned): replacements[raw_url] = "" - logging.info(f"🧹 Removing X/Twitter URL from visible text: {cleaned}") + logging.info( + f"🧹 Removing X/Twitter URL from visible text: {cleaned}" + ) continue final_url = cleaned @@ -767,18 +853,27 @@ def sanitize_visible_urls_in_text(text, http_client, has_media=False): final_url = resolved_http_first URL_RESOLUTION_CACHE[cleaned] = final_url else: - if has_media and resolved_http_first and is_x_or_twitter_domain(resolved_http_first): + if ( + has_media + and resolved_http_first + and is_x_or_twitter_domain(resolved_http_first) + ): final_url = resolved_http_first URL_RESOLUTION_CACHE[cleaned] = final_url logging.info( - f"⚑ Skipping Playwright t.co fallback because tweet has media and httpx already resolved to X/Twitter URL: {final_url}" + f"⚑ Skipping Playwright t.co fallback because tweet has media " + f"and httpx already resolved to X/Twitter URL: {final_url}" ) else: - final_url = resolve_url_if_needed(cleaned, http_client, allow_playwright_fallback=True) + final_url = resolve_url_if_needed( + cleaned, http_client, allow_playwright_fallback=True + ) if is_x_or_twitter_domain(final_url): replacements[raw_url] = "" - logging.info(f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}") + logging.info( + f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}" + ) continue if normalized and normalized.startswith("https://www."): @@ -851,7 +946,9 @@ def build_effective_tweet_text(tweet, http_client): scraped_urls = extract_urls_from_text(scraped_text) og_urls = extract_urls_from_text(og_title_text) - if len(og_title_text) >= len(scraped_text) or (og_urls and not scraped_urls): + if len(og_title_text) >= len(scraped_text) or ( + og_urls and not scraped_urls + ): candidate_text = og_title_text logging.info("🧾 Using og:title-derived tweet text as primary content") @@ -862,17 +959,21 @@ def build_effective_tweet_text(tweet, http_client): ) candidate_text = clean_post_text(candidate_text) - # --- KEY FIX: also resolve the card_url scraped from the tweet's link preview --- - resolved_card_url = resolve_card_url(getattr(tweet, "card_url", None), http_client) + # --- Resolve the card_url scraped from the tweet's link preview --- + resolved_card_url = resolve_card_url( + getattr(tweet, "card_url", None), http_client + ) if resolved_card_url and is_external_non_x_url(resolved_card_url): if not resolved_primary_external_url: resolved_primary_external_url = resolved_card_url - logging.info(f"πŸ”— Using resolved card URL as primary external URL: {resolved_card_url}") + logging.info( + f"πŸ”— Using resolved card URL as primary external URL: {resolved_card_url}" + ) elif resolved_primary_external_url != resolved_card_url: logging.info( - f"ℹ️ Card URL ({resolved_card_url}) differs from text URL ({resolved_primary_external_url}). " - f"Preferring card URL for external embed." + f"ℹ️ Card URL ({resolved_card_url}) differs from text URL " + f"({resolved_primary_external_url}). Preferring card URL for external embed." ) resolved_primary_external_url = resolved_card_url @@ -900,7 +1001,9 @@ def remove_url_from_visible_text(text, url_to_remove): for url in line_urls: normalized = normalize_urlish_token(url) or url - cleaned_candidate = canonicalize_url(strip_trailing_url_punctuation(normalized)) + cleaned_candidate = canonicalize_url( + strip_trailing_url_punctuation(normalized) + ) if cleaned_candidate == canonical_target: pattern = re.escape(url) new_line = re.sub(pattern, "", new_line) @@ -929,7 +1032,11 @@ def looks_like_title_plus_url_post(text): urls_in_last_line = extract_ordered_non_x_urls(last_line) total_urls = extract_ordered_non_x_urls(repaired) - return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://", "www.")) + return ( + len(urls_in_last_line) == 1 + and len(total_urls) == 1 + and last_line.startswith(("http://", "https://", "www.")) + ) def looks_like_url_and_tag_tail(text, primary_non_x_url=None): @@ -989,8 +1096,7 @@ def find_tail_preservation_start(text, primary_non_x_url): candidates.append(generous_start) reasonable_candidates = [ - c for c in candidates - if 0 <= c < url_pos and (url_pos - c) <= 180 + c for c in candidates if 0 <= c < url_pos and (url_pos - c) <= 180 ] if reasonable_candidates: @@ -1008,15 +1114,22 @@ def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): if len(text) <= max_length: return text - truncated = text[:max_length - 3] + truncated = text[: max_length - 3] last_space = truncated.rfind(" ") if last_space > 0: return truncated[:last_space] + "..." return truncated + "..." -def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LENGTH): - if not text or tail_start is None or tail_start < 0 or tail_start >= len(text): +def truncate_text_preserving_tail( + text, tail_start, max_length=BSKY_TEXT_MAX_LENGTH +): + if ( + not text + or tail_start is None + or tail_start < 0 + or tail_start >= len(text) + ): return truncate_text_safely(text, max_length) if len(text) <= max_length: @@ -1028,11 +1141,11 @@ def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LEN reserve = len(tail) + 4 if reserve >= max_length: - shortened_tail = tail[-(max_length - 3):].strip() + shortened_tail = tail[-(max_length - 3) :].strip() first_space = shortened_tail.find(" ") if 0 <= first_space <= 30: - shortened_tail = shortened_tail[first_space + 1:].strip() + shortened_tail = shortened_tail[first_space + 1 :].strip() return f"...{shortened_tail}" @@ -1055,28 +1168,45 @@ def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LEN return truncate_text_safely(text, max_length) -def choose_final_visible_text(full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True): +def choose_final_visible_text( + full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True +): text = clean_post_text(full_clean_text or "") if not text: return text if len(text) <= BSKY_TEXT_MAX_LENGTH: - logging.info("🟒 Original cleaned tweet text fits in Bluesky. Preserving exact text.") + logging.info( + "🟒 Original cleaned tweet text fits in Bluesky. Preserving exact text." + ) return text if primary_non_x_url: tail_start = find_tail_preservation_start(text, primary_non_x_url) if tail_start is not None: - preserved = truncate_text_preserving_tail(text, tail_start, BSKY_TEXT_MAX_LENGTH) + preserved = truncate_text_preserving_tail( + text, tail_start, BSKY_TEXT_MAX_LENGTH + ) if preserved and len(preserved) <= BSKY_TEXT_MAX_LENGTH: - logging.info("πŸ”— Preserving meaningful ending block with URL/hashtags in visible Bluesky text") + logging.info( + "πŸ”— Preserving meaningful ending block with URL/hashtags in visible Bluesky text" + ) return preserved - if prefer_full_text_without_url and not looks_like_url_and_tag_tail(text, primary_non_x_url): - text_without_url = remove_url_from_visible_text(text, primary_non_x_url).strip() - if text_without_url and len(text_without_url) <= BSKY_TEXT_MAX_LENGTH: - logging.info("πŸ”— Keeping full visible text by removing long external URL from body and using external card") + if prefer_full_text_without_url and not looks_like_url_and_tag_tail( + text, primary_non_x_url + ): + text_without_url = remove_url_from_visible_text( + text, primary_non_x_url + ).strip() + if ( + text_without_url + and len(text_without_url) <= BSKY_TEXT_MAX_LENGTH + ): + logging.info( + "πŸ”— Keeping full visible text by removing long external URL from body and using external card" + ) return text_without_url truncated = truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH) @@ -1110,7 +1240,9 @@ def build_media_fingerprint(tweet): stable_value = re.sub(r"[?&]name=\w+", "", stable_value) stable_value = re.sub(r"[?&]format=\w+", "", stable_value) elif media_type == "video": - stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "") + stable_value = canonicalize_tweet_url( + tweet.tweet_url or media_url or "" + ) parts.append(f"{media_type}:{stable_value}") @@ -1131,12 +1263,20 @@ def build_bsky_media_fingerprint(post_view): if images: for img in images: image_obj = getattr(img, "image", None) - ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj) + ref = ( + getattr(image_obj, "ref", None) + or getattr(image_obj, "cid", None) + or str(image_obj) + ) parts.append(f"photo:{ref}") video = getattr(embed, "video", None) if video: - ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) + ref = ( + getattr(video, "ref", None) + or getattr(video, "cid", None) + or str(video) + ) parts.append(f"video:{ref}") external = getattr(embed, "external", None) @@ -1157,17 +1297,23 @@ def build_bsky_media_fingerprint(post_view): def build_text_media_key(normalized_text, media_fingerprint): - return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest() + return hashlib.sha256( + f"{normalized_text}||{media_fingerprint}".encode("utf-8") + ).hexdigest() def create_bsky_client(base_url, handle, password): normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") - logging.info(f"πŸ” Connecting Bluesky client via base URL: {normalized_base_url}") + logging.info( + f"πŸ” Connecting Bluesky client via base URL: {normalized_base_url}" + ) try: client = Client(base_url=normalized_base_url) except TypeError: - logging.warning("⚠️ Your atproto Client does not accept base_url in constructor. Falling back.") + logging.warning( + "⚠️ Your atproto Client does not accept base_url in constructor. Falling back." + ) client = Client() try: if hasattr(client, "base_url"): @@ -1175,12 +1321,15 @@ def create_bsky_client(base_url, handle, password): elif hasattr(client, "_base_url"): client._base_url = normalized_base_url except Exception as e: - logging.warning(f"⚠️ Could not apply custom base URL cleanly: {e}") + logging.warning( + f"⚠️ Could not apply custom base URL cleanly: {e}" + ) client.login(handle, password) return client +# --- State Management --- def default_state(): return { "version": 1, @@ -1192,7 +1341,9 @@ def default_state(): def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): - logging.info(f"🧠 No state file found at {state_path}. Starting with empty memory.") + logging.info( + f"🧠 No state file found at {state_path}. Starting with empty memory." + ) return default_state() try: @@ -1211,7 +1362,9 @@ def load_state(state_path=STATE_PATH): return state except Exception as e: - logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.") + logging.warning( + f"⚠️ Could not load state file {state_path}: {e}. Reinitializing." + ) return default_state() @@ -1239,12 +1392,16 @@ def remember_posted_tweet(state, candidate, bsky_uri=None): "canonical_tweet_url": canonical_tweet_url, "normalized_text": candidate["normalized_text"], "raw_text": candidate["raw_text"], - "full_clean_text": candidate.get("full_clean_text", candidate["raw_text"]), + "full_clean_text": candidate.get( + "full_clean_text", candidate["raw_text"] + ), "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), "ordered_non_x_urls": candidate.get("ordered_non_x_urls", []), - "resolved_primary_external_url": candidate.get("resolved_primary_external_url"), + "resolved_primary_external_url": candidate.get( + "resolved_primary_external_url" + ), "bsky_uri": bsky_uri, "tweet_created_on": candidate["tweet"].created_on, "tweet_url": candidate["tweet"].tweet_url, @@ -1307,6 +1464,7 @@ def prune_state(state, max_entries=5000): return state +# --- Bluesky Feed Helpers --- def extract_urls_from_facets(record): urls = [] @@ -1348,38 +1506,53 @@ def get_recent_bsky_posts(client, handle, limit=30): canonical_non_x_urls = set() for url in urls: - if not is_tco_domain(url) and not is_x_or_twitter_domain(url): - canonical = canonicalize_url(normalize_urlish_token(url) or url) + if not is_tco_domain(url) and not is_x_or_twitter_domain( + url + ): + canonical = canonicalize_url( + normalize_urlish_token(url) or url + ) if canonical: canonical_non_x_urls.add(canonical) media_fingerprint = build_bsky_media_fingerprint(item.post) - text_media_key = build_text_media_key(normalized_text, media_fingerprint) + text_media_key = build_text_media_key( + normalized_text, media_fingerprint + ) - recent_posts.append({ - "uri": getattr(item.post, "uri", None), - "text": text, - "normalized_text": normalized_text, - "canonical_non_x_urls": canonical_non_x_urls, - "media_fingerprint": media_fingerprint, - "text_media_key": text_media_key, - "created_at": getattr(record, "created_at", None), - }) + recent_posts.append( + { + "uri": getattr(item.post, "uri", None), + "text": text, + "normalized_text": normalized_text, + "canonical_non_x_urls": canonical_non_x_urls, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "created_at": getattr(record, "created_at", None), + } + ) except Exception as e: - logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") + logging.debug( + f"Skipping one Bluesky feed item during dedupe fetch: {e}" + ) except Exception as e: - logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}") + logging.warning( + f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}" + ) return recent_posts +# --- Upload / Retry Helpers --- def get_rate_limit_wait_seconds(error_obj, default_delay): try: headers = getattr(error_obj, "headers", None) if headers: - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") + reset_value = headers.get("ratelimit-reset") or headers.get( + "RateLimit-Reset" + ) if reset_value: now_ts = int(time.time()) reset_ts = int(reset_value) @@ -1419,12 +1592,14 @@ def upload_blob_with_retry(client, binary_data, media_label="media"): except Exception as e: last_exception = e error_text = str(e) - is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text + is_rate_limited = ( + "429" in error_text or "RateLimitExceeded" in error_text + ) if is_rate_limited: backoff_delay = min( BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_BLOB_UPLOAD_MAX_DELAY + BSKY_BLOB_UPLOAD_MAX_DELAY, ) wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) @@ -1441,9 +1616,14 @@ def upload_blob_with_retry(client, binary_data, media_label="media"): ) break - if is_transient_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: + if ( + is_transient_error(e) + and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES + ): transient_attempts += 1 - wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts + wait_seconds = ( + BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts + ) logging.warning( f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. " f"Transient retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s." @@ -1455,8 +1635,12 @@ def upload_blob_with_retry(client, binary_data, media_label="media"): if hasattr(e, "response") and e.response is not None: try: - logging.warning(f"Upload response status: {e.response.status_code}") - logging.warning(f"Upload response body: {e.response.text}") + logging.warning( + f"Upload response status: {e.response.status_code}" + ) + logging.warning( + f"Upload response body: {e.response.text}" + ) except Exception: pass @@ -1480,12 +1664,14 @@ def send_post_with_retry(client, **kwargs): except Exception as e: last_exception = e error_text = str(e) - is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text + is_rate_limited = ( + "429" in error_text or "RateLimitExceeded" in error_text + ) if is_rate_limited: backoff_delay = min( BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_SEND_POST_MAX_DELAY + BSKY_SEND_POST_MAX_DELAY, ) wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) @@ -1497,10 +1683,15 @@ def send_post_with_retry(client, **kwargs): time.sleep(wait_seconds) continue else: - logging.error(f"❌ Exhausted send_post retries after rate limiting: {repr(e)}") + logging.error( + f"❌ Exhausted send_post retries after rate limiting: {repr(e)}" + ) raise - if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: + if ( + is_transient_error(e) + and attempt < BSKY_SEND_POST_MAX_RETRIES + ): wait_seconds = BSKY_SEND_POST_BASE_DELAY * attempt logging.warning( f"⏳ Transient send_post failure: {repr(e)}. " @@ -1514,6 +1705,7 @@ def send_post_with_retry(client, **kwargs): raise last_exception +# --- Image Compression --- def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): try: with Image.open(io.BytesIO(image_bytes)) as img: @@ -1524,13 +1716,24 @@ def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): if max_dim > BSKY_IMAGE_MAX_DIMENSION: scale = BSKY_IMAGE_MAX_DIMENSION / max_dim - new_size = (max(1, int(width * scale)), max(1, int(height * scale))) + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) img = img.resize(new_size, Image.LANCZOS) - logging.info(f"πŸ–ΌοΈ Resized post image to {new_size[0]}x{new_size[1]}") + logging.info( + f"πŸ–ΌοΈ Resized post image to {new_size[0]}x{new_size[1]}" + ) for quality in [90, 82, 75, 68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: out = io.BytesIO() - img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) + img.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) data = out.getvalue() logging.info( @@ -1548,12 +1751,21 @@ def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): if max_dim > target_dim: scale = target_dim / max_dim - new_size = (max(1, int(width * scale)), max(1, int(height * scale))) + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) resized = resized.resize(new_size, Image.LANCZOS) for quality in [68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: out = io.BytesIO() - resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) + resized.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) data = out.getvalue() logging.info( @@ -1569,17 +1781,22 @@ def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): return None - def get_blob_from_url(media_url, client, http_client): try: - r = http_client.get(media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True) + r = http_client.get( + media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True + ) if r.status_code != 200: - logging.warning(f"Could not fetch media {media_url}: HTTP {r.status_code}") + logging.warning( + f"Could not fetch media {media_url}: HTTP {r.status_code}" + ) return None content = r.content if not content: - logging.warning(f"Could not fetch media {media_url}: empty response body") + logging.warning( + f"Could not fetch media {media_url}: empty response body" + ) return None content_type = (r.headers.get("content-type") or "").lower() @@ -1587,14 +1804,19 @@ def get_blob_from_url(media_url, client, http_client): if content_type.startswith("image/"): original_size = len(content) - logging.info(f"πŸ–ΌοΈ Downloaded post image {media_url} ({original_size} bytes / {original_size / 1024:.2f} KB)") + logging.info( + f"πŸ–ΌοΈ Downloaded post image {media_url} " + f"({original_size} bytes / {original_size / 1024:.2f} KB)" + ) if original_size > BSKY_IMAGE_MAX_BYTES: logging.info( f"πŸ–ΌοΈ Post image exceeds safe Bluesky limit " f"({original_size} bytes > {BSKY_IMAGE_MAX_BYTES} bytes). Compressing..." ) - compressed = compress_post_image_to_limit(content, BSKY_IMAGE_MAX_BYTES) + compressed = compress_post_image_to_limit( + content, BSKY_IMAGE_MAX_BYTES + ) if compressed: upload_bytes = compressed logging.info( @@ -1602,27 +1824,39 @@ def get_blob_from_url(media_url, client, http_client): f"({len(upload_bytes) / 1024:.2f} KB)" ) else: - logging.warning(f"⚠️ Could not compress post image to safe limit: {media_url}") + logging.warning( + f"⚠️ Could not compress post image to safe limit: {media_url}" + ) return None - return upload_blob_with_retry(client, upload_bytes, media_label=media_url) + return upload_blob_with_retry( + client, upload_bytes, media_label=media_url + ) except Exception as e: logging.warning(f"Could not fetch media {media_url}: {repr(e)}") return None + def get_blob_from_file(file_path, client): try: if not os.path.exists(file_path): - logging.warning(f"Could not upload local file {file_path}: file does not exist") + logging.warning( + f"Could not upload local file {file_path}: file does not exist" + ) return None file_size = os.path.getsize(file_path) file_size_mb = file_size / (1024 * 1024) - logging.info(f"πŸ“¦ Uploading local file {file_path} ({file_size_mb:.2f} MB)") + logging.info( + f"πŸ“¦ Uploading local file {file_path} ({file_size_mb:.2f} MB)" + ) - if file_path.lower().endswith(".mp4") and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: + if ( + file_path.lower().endswith(".mp4") + and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB + ): logging.warning( f"Could not upload local file {file_path}: " f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" @@ -1632,22 +1866,32 @@ def get_blob_from_file(file_path, client): with open(file_path, "rb") as f: binary_data = f.read() - return upload_blob_with_retry(client, binary_data, media_label=file_path) + return upload_blob_with_retry( + client, binary_data, media_label=file_path + ) except Exception as e: - logging.warning(f"Could not upload local file {file_path}: {repr(e)}") + logging.warning( + f"Could not upload local file {file_path}: {repr(e)}" + ) if hasattr(e, "response") and e.response is not None: try: - logging.warning(f"Upload response status: {e.response.status_code}") - logging.warning(f"Upload response body: {e.response.text}") + logging.warning( + f"Upload response status: {e.response.status_code}" + ) + logging.warning( + f"Upload response body: {e.response.text}" + ) except Exception: pass return None -def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES): +def compress_external_thumb_to_limit( + image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES +): try: with Image.open(io.BytesIO(image_bytes)) as img: img = img.convert("RGB") @@ -1657,13 +1901,26 @@ def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_B if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim - new_size = (max(1, int(width * scale)), max(1, int(height * scale))) + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) img = img.resize(new_size, Image.LANCZOS) - logging.info(f"πŸ–ΌοΈ Resized external thumb to {new_size[0]}x{new_size[1]}") + logging.info( + f"πŸ–ΌοΈ Resized external thumb to {new_size[0]}x{new_size[1]}" + ) - for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + for quality in [ + 85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY + ]: out = io.BytesIO() - img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) + img.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) data = out.getvalue() logging.info( @@ -1681,12 +1938,21 @@ def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_B if max_dim > target_dim: scale = target_dim / max_dim - new_size = (max(1, int(width * scale)), max(1, int(height * scale))) + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) resized = resized.resize(new_size, Image.LANCZOS) for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: out = io.BytesIO() - resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) + resized.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) data = out.getvalue() logging.info( @@ -1698,25 +1964,35 @@ def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_B return data except Exception as e: - logging.warning(f"Could not compress external thumbnail: {repr(e)}") + logging.warning( + f"Could not compress external thumbnail: {repr(e)}" + ) return None def get_external_thumb_blob_from_url(image_url, client, http_client): try: - r = http_client.get(image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True) + r = http_client.get( + image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True + ) if r.status_code != 200: - logging.warning(f"Could not fetch external thumb {image_url}: HTTP {r.status_code}") + logging.warning( + f"Could not fetch external thumb {image_url}: HTTP {r.status_code}" + ) return None content = r.content if not content: - logging.warning(f"Could not fetch external thumb {image_url}: empty body") + logging.warning( + f"Could not fetch external thumb {image_url}: empty body" + ) return None original_size_kb = len(content) / 1024 - logging.info(f"πŸ–ΌοΈ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)") + logging.info( + f"πŸ–ΌοΈ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)" + ) upload_bytes = content if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES: @@ -1724,67 +2000,113 @@ def get_external_thumb_blob_from_url(image_url, client, http_client): f"πŸ–ΌοΈ External thumb exceeds safe limit " f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..." ) - compressed = compress_external_thumb_to_limit(upload_bytes, EXTERNAL_THUMB_MAX_BYTES) + compressed = compress_external_thumb_to_limit( + upload_bytes, EXTERNAL_THUMB_MAX_BYTES + ) if compressed: upload_bytes = compressed - logging.info(f"βœ… External thumb compressed to {len(upload_bytes) / 1024:.2f} KB") + logging.info( + f"βœ… External thumb compressed to {len(upload_bytes) / 1024:.2f} KB" + ) else: - logging.warning("⚠️ Could not compress external thumb to fit limit. Will omit thumbnail.") + logging.warning( + "⚠️ Could not compress external thumb to fit limit. Will omit thumbnail." + ) return None else: - logging.info("βœ… External thumb already within safe size limit.") + logging.info( + "βœ… External thumb already within safe size limit." + ) - blob = upload_blob_with_retry(client, upload_bytes, media_label=f"external-thumb:{image_url}") + blob = upload_blob_with_retry( + client, + upload_bytes, + media_label=f"external-thumb:{image_url}", + ) if blob: return blob - logging.warning("⚠️ External thumb upload failed. Will omit thumbnail.") + logging.warning( + "⚠️ External thumb upload failed. Will omit thumbnail." + ) return None except Exception as e: - logging.warning(f"Could not fetch/upload external thumb {image_url}: {repr(e)}") + logging.warning( + f"Could not fetch/upload external thumb {image_url}: {repr(e)}" + ) return None def fetch_link_metadata(url, http_client): try: - r = http_client.get(url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True) + r = http_client.get( + url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True + ) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") - title = (soup.find("meta", property="og:title") or soup.find("title")) - desc = ( - soup.find("meta", property="og:description") - or soup.find("meta", attrs={"name": "description"}) - ) - image = ( - soup.find("meta", property="og:image") - or soup.find("meta", attrs={"name": "twitter:image"}) - ) + title = soup.find("meta", property="og:title") or soup.find("title") + desc = soup.find( + "meta", property="og:description" + ) or soup.find("meta", attrs={"name": "description"}) + image = soup.find( + "meta", property="og:image" + ) or soup.find("meta", attrs={"name": "twitter:image"}) return { - "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), - "description": desc["content"] if desc and desc.has_attr("content") else "", - "image": image["content"] if image and image.has_attr("content") else None, + "title": ( + title["content"] + if title and title.has_attr("content") + else ( + title.text.strip() + if title and title.text + else "" + ) + ), + "description": ( + desc["content"] + if desc and desc.has_attr("content") + else "" + ), + "image": ( + image["content"] + if image and image.has_attr("content") + else None + ), } except Exception as e: - logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}") + logging.warning( + f"Could not fetch link metadata for {url}: {repr(e)}" + ) return {} -def build_external_link_embed(url, client, http_client, fallback_title="Link"): +def build_external_link_embed( + url, client, http_client, fallback_title="Link" +): link_metadata = fetch_link_metadata(url, http_client) thumb_blob = None if link_metadata.get("image"): - thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client) + thumb_blob = get_external_thumb_blob_from_url( + link_metadata["image"], client, http_client + ) if thumb_blob: - logging.info("βœ… External link card thumbnail prepared successfully") + logging.info( + "βœ… External link card thumbnail prepared successfully" + ) else: - logging.info("ℹ️ External link card will be posted without thumbnail") + logging.info( + "ℹ️ External link card will be posted without thumbnail" + ) - if link_metadata.get("title") or link_metadata.get("description") or thumb_blob: + if ( + link_metadata.get("title") + or link_metadata.get("description") + or thumb_blob + ): return models.AppBskyEmbedExternal.Main( external=models.AppBskyEmbedExternal.External( uri=url, @@ -1858,7 +2180,9 @@ def make_rich(content): def build_dynamic_alt(raw_text): dynamic_alt = clean_post_text(raw_text) dynamic_alt = dynamic_alt.replace("\n", " ").strip() - dynamic_alt = re.sub(r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt).strip() + dynamic_alt = re.sub( + r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt + ).strip() if len(dynamic_alt) > 150: dynamic_alt = dynamic_alt[:147] + "..." @@ -1870,20 +2194,27 @@ def build_dynamic_alt(raw_text): def build_video_embed(video_blob, alt_text): try: - return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) + return models.AppBskyEmbedVideo.Main( + video=video_blob, alt=alt_text + ) except AttributeError: - logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") + logging.error( + "❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto." + ) return None -def scrape_tweets_via_playwright(username, password, email, target_handle): +# --- Twitter Scraping --- +def scrape_tweets_via_playwright( + username, password, email, target_handle +): tweets = [] state_file = "twitter_browser_state.json" with sync_playwright() as p: browser = p.chromium.launch( headless=True, - args=["--disable-blink-features=AutomationControlled"] + args=["--disable-blink-features=AutomationControlled"], ) clean_ua = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " @@ -1895,29 +2226,40 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): needs_login = True if os.path.exists(state_file): - logging.info("βœ… Found existing browser state. Attempting to bypass login...") + logging.info( + "βœ… Found existing browser state. Attempting to bypass login..." + ) context = browser.new_context( user_agent=clean_ua, viewport={"width": 1920, "height": 1080}, - storage_state=state_file + storage_state=state_file, ) page = context.new_page() page.goto("https://x.com/home") time.sleep(3) - if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: + if ( + page.locator( + '[data-testid="SideNav_NewTweet_Button"]' + ).is_visible() + or "/home" in page.url + ): logging.info("βœ… Session is valid!") needs_login = False else: - logging.warning("⚠️ Saved session expired or invalid. Re-logging in...") + logging.warning( + "⚠️ Saved session expired or invalid. Re-logging in..." + ) context.close() os.remove(state_file) if needs_login: - logging.info("πŸš€ Launching fresh browser for automated Twitter login...") + logging.info( + "πŸš€ Launching fresh browser for automated Twitter login..." + ) context = browser.new_context( user_agent=clean_ua, - viewport={"width": 1920, "height": 1080} + viewport={"width": 1920, "height": 1080}, ) page = context.new_page() @@ -1927,31 +2269,52 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): sign_in_button.wait_for(state="visible", timeout=15000) sign_in_button.click(force=True) - page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000) + page.wait_for_selector( + 'h1:has-text("Sign in to X")', + state="visible", + timeout=25000, + ) logging.info(f"πŸ‘€ Entering username: {username}...") time.sleep(1) - username_input = page.locator('input[autocomplete="username"]').first + username_input = page.locator( + 'input[autocomplete="username"]' + ).first username_input.wait_for(state="visible", timeout=15000) username_input.click(force=True) username_input.press_sequentially(username, delay=100) - page.locator('button:has-text("Next")').first.click(force=True) + page.locator('button:has-text("Next")').first.click( + force=True + ) page.wait_for_selector( 'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', - timeout=15000 + timeout=15000, ) time.sleep(1) - if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): - logging.warning("πŸ›‘οΈ Security challenge detected! Entering email/phone...") - page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) - sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first + if page.locator( + 'input[data-testid="ocfEnterTextTextInput"]' + ).is_visible() or page.locator( + 'input[name="text"]' + ).is_visible(): + logging.warning( + "πŸ›‘οΈ Security challenge detected! Entering email/phone..." + ) + page.fill( + 'input[data-testid="ocfEnterTextTextInput"], input[name="text"]', + email, + ) + sec_next = page.locator( + '[data-testid="ocfEnterTextNextButton"], span:has-text("Next")' + ).first if sec_next.is_visible(): sec_next.click(force=True) else: page.keyboard.press("Enter") - page.wait_for_selector('input[name="password"]', timeout=15000) + page.wait_for_selector( + 'input[name="password"]', timeout=15000 + ) time.sleep(1) logging.info("πŸ”‘ Entering password...") @@ -1970,7 +2333,9 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): browser.close() return [] - logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...") + logging.info( + f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets..." + ) page = context.new_page() page.goto(f"https://x.com/{target_handle}") @@ -1979,7 +2344,10 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): time.sleep(2) articles = page.locator("article").all() - logging.info(f"πŸ“Š Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...") + logging.info( + f"πŸ“Š Found {len(articles)} tweets on screen. " + f"Parsing up to {SCRAPE_TWEET_LIMIT}..." + ) for article in articles[:SCRAPE_TWEET_LIMIT]: try: @@ -1994,71 +2362,122 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): if time_link.is_visible(): href = time_link.get_attribute("href") if href: - tweet_url = f"https://x.com{href}" if href.startswith("/") else href + tweet_url = ( + f"https://x.com{href}" + if href.startswith("/") + else href + ) # --- Retweet detection --- is_retweet = False try: - social_context = article.locator('[data-testid="socialContext"]').first + social_context = article.locator( + '[data-testid="socialContext"]' + ).first if social_context.is_visible(): - context_text = social_context.inner_text().lower() - if "reposted" in context_text or "retweeted" in context_text or "ha repostejat" in context_text or "ha retuitat" in context_text or "repostejat" in context_text: + context_text = ( + social_context.inner_text().lower() + ) + repost_keywords = [ + "reposted", + "retweeted", + "ha repostejat", + "ha retuitat", + "repostejat", + "ha reposteado", + "retuiteΓ³", + ] + if any( + kw in context_text + for kw in repost_keywords + ): is_retweet = True - logging.info(f"πŸ” Detected retweet/repost: {tweet_url}") + logging.info( + f"πŸ” Detected retweet/repost: {tweet_url}" + ) except Exception: pass - text_locator = article.locator('[data-testid="tweetText"]').first - text = text_locator.inner_text() if text_locator.is_visible() else "" + text_locator = article.locator( + '[data-testid="tweetText"]' + ).first + text = ( + text_locator.inner_text() + if text_locator.is_visible() + else "" + ) media_urls = [] - photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() + photo_locators = article.locator( + '[data-testid="tweetPhoto"] img' + ).all() for img in photo_locators: src = img.get_attribute("src") if src: - src = re.sub(r"&name=\w+", "&name=large", src) + src = re.sub( + r"&name=\w+", "&name=large", src + ) media_urls.append((src, "photo")) - video_locators = article.locator('[data-testid="videoPlayer"]').all() + video_locators = article.locator( + '[data-testid="videoPlayer"]' + ).all() if video_locators: media_urls.append((tweet_url or "", "video")) # --- Card URL extraction (link preview card) --- card_url = None try: - card_locator = article.locator('[data-testid="card.wrapper"] a[href]').first + card_locator = article.locator( + '[data-testid="card.wrapper"] a[href]' + ).first if card_locator.is_visible(): card_href = card_locator.get_attribute("href") if card_href: card_url = card_href.strip() - logging.info(f"πŸƒ Scraped card URL from tweet: {card_url}") + logging.info( + f"πŸƒ Scraped card URL from tweet: {card_url}" + ) except Exception: pass - # Fallback: try to find card link via role="link" inside card wrapper if not card_url: try: - card_role_link = article.locator('[data-testid="card.wrapper"] [role="link"]').first + card_role_link = article.locator( + '[data-testid="card.wrapper"] [role="link"]' + ).first if card_role_link.is_visible(): - card_a = card_role_link.locator("a[href]").first + card_a = card_role_link.locator( + "a[href]" + ).first if card_a.is_visible(): - card_href = card_a.get_attribute("href") + card_href = card_a.get_attribute( + "href" + ) if card_href: card_url = card_href.strip() - logging.info(f"πŸƒ Scraped card URL (fallback) from tweet: {card_url}") + logging.info( + f"πŸƒ Scraped card URL (fallback) from tweet: {card_url}" + ) except Exception: pass - tweets.append(ScrapedTweet( - created_at, text, media_urls, - tweet_url=tweet_url, - card_url=card_url, - is_retweet=is_retweet, - )) + tweets.append( + ScrapedTweet( + created_at, + text, + media_urls, + tweet_url=tweet_url, + card_url=card_url, + is_retweet=is_retweet, + ) + ) except Exception as e: - logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") + logging.warning( + f"⚠️ Failed to parse a specific tweet: {e}" + ) continue except Exception as e: @@ -2069,6 +2488,7 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): return tweets +# --- Video Extraction & Processing --- def extract_video_url_from_tweet_page(context, tweet_url): page = context.new_page() best_m3u8_url = None @@ -2079,10 +2499,13 @@ def extract_video_url_from_tweet_page(context, tweet_url): url_l = url.lower() content_type_l = content_type.lower() return ( - "/aud/" in url_l or - "/audio/" in url_l or - "mp4a" in url_l or - ("audio/" in content_type_l and "video/" not in content_type_l) + "/aud/" in url_l + or "/audio/" in url_l + or "mp4a" in url_l + or ( + "audio/" in content_type_l + and "video/" not in content_type_l + ) ) def handle_response(response): @@ -2101,18 +2524,24 @@ def extract_video_url_from_tweet_page(context, tweet_url): return if ( - ".m3u8" in url_l or - "application/vnd.apple.mpegurl" in content_type_l or - "application/x-mpegurl" in content_type_l + ".m3u8" in url_l + or "application/vnd.apple.mpegurl" in content_type_l + or "application/x-mpegurl" in content_type_l ): if best_m3u8_url is None: best_m3u8_url = url logging.info(f"πŸ“Ί Found HLS playlist URL: {url}") return - if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l: + if ( + ".mp4" in url_l + or "video/mp4" in content_type_l + or "audio/mp4" in content_type_l + ): if is_audio_only_mp4(url, content_type): - logging.info(f"πŸ”‡ Ignoring audio-only MP4: {url}") + logging.info( + f"πŸ”‡ Ignoring audio-only MP4: {url}" + ) return if best_video_mp4_url is None: @@ -2129,8 +2558,12 @@ def extract_video_url_from_tweet_page(context, tweet_url): return best_m3u8_url or best_video_mp4_url try: - logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") - page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) + logging.info( + f"🎬 Opening tweet page to capture video URL: {tweet_url}" + ) + page.goto( + tweet_url, wait_until="domcontentloaded", timeout=30000 + ) time.sleep(2) player = page.locator('[data-testid="videoPlayer"]').first @@ -2147,7 +2580,9 @@ def extract_video_url_from_tweet_page(context, tweet_url): except Exception as e: logging.info(f"⚠️ First player click failed: {e}") else: - logging.warning("⚠️ No video player locator found on tweet page") + logging.warning( + "⚠️ No video player locator found on tweet page" + ) for _ in range(8): if current_best(): @@ -2155,7 +2590,9 @@ def extract_video_url_from_tweet_page(context, tweet_url): time.sleep(1) if not current_best() and player.count() > 0: - logging.info("πŸ” No media URL found yet, retrying player interaction...") + logging.info( + "πŸ” No media URL found yet, retrying player interaction..." + ) try: player.click(force=True, timeout=5000) time.sleep(2) @@ -2175,14 +2612,20 @@ def extract_video_url_from_tweet_page(context, tweet_url): selected_url = current_best() if selected_url: - logging.info(f"βœ… Selected media URL for download: {selected_url}") + logging.info( + f"βœ… Selected media URL for download: {selected_url}" + ) else: - logging.warning(f"⚠️ No playable media URL detected on tweet page: {tweet_url}") + logging.warning( + f"⚠️ No playable media URL detected on tweet page: {tweet_url}" + ) return selected_url except Exception as e: - logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}") + logging.warning( + f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}" + ) return None finally: page.close() @@ -2194,7 +2637,9 @@ def download_and_crop_video(video_url, output_path): temp_output = output_path.replace(".mp4", "_compressed.mp4") try: - logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}") + logging.info( + f"⬇️ Downloading video source with ffmpeg: {video_url}" + ) video_url_l = video_url.lower() @@ -2203,10 +2648,14 @@ def download_and_crop_video(video_url, output_path): download_cmd = [ "ffmpeg", "-y", - "-protocol_whitelist", "file,http,https,tcp,tls,crypto", - "-allowed_extensions", "ALL", - "-i", video_url, - "-c", "copy", + "-protocol_whitelist", + "file,http,https,tcp,tls,crypto", + "-allowed_extensions", + "ALL", + "-i", + video_url, + "-c", + "copy", temp_input, ] else: @@ -2214,32 +2663,47 @@ def download_and_crop_video(video_url, output_path): download_cmd = [ "ffmpeg", "-y", - "-i", video_url, - "-c", "copy", + "-i", + video_url, + "-c", + "copy", temp_input, ] download_result = subprocess.run( - download_cmd, capture_output=True, text=True, - timeout=SUBPROCESS_TIMEOUT_SECONDS + download_cmd, + capture_output=True, + text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, ) if download_result.returncode != 0: - logging.error(f"❌ ffmpeg download failed:\n{download_result.stderr}") + logging.error( + f"❌ ffmpeg download failed:\n{download_result.stderr}" + ) return None - if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: - logging.error("❌ Downloaded video source file is missing or empty.") + if ( + not os.path.exists(temp_input) + or os.path.getsize(temp_input) == 0 + ): + logging.error( + "❌ Downloaded video source file is missing or empty." + ) return None logging.info(f"βœ… Video downloaded: {temp_input}") video_clip = VideoFileClip(temp_input) - duration = float(video_clip.duration) if video_clip.duration else 0 + duration = ( + float(video_clip.duration) if video_clip.duration else 0 + ) if duration <= 0: video_clip.close() - logging.error("❌ Downloaded video has invalid or unknown duration.") + logging.error( + "❌ Downloaded video has invalid or unknown duration." + ) return None end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) @@ -2256,57 +2720,87 @@ def download_and_crop_video(video_url, output_path): preset="veryfast", bitrate="1800k", audio_bitrate="128k", - logger=None + logger=None, ) video_clip.close() cropped_clip.close() - if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0: - logging.error("❌ Trimmed video output is missing or empty.") + if ( + not os.path.exists(temp_trimmed) + or os.path.getsize(temp_trimmed) == 0 + ): + logging.error( + "❌ Trimmed video output is missing or empty." + ) return None trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024) - logging.info(f"πŸ“¦ Trimmed video size before compression: {trimmed_size_mb:.2f} MB") + logging.info( + f"πŸ“¦ Trimmed video size before compression: {trimmed_size_mb:.2f} MB" + ) compress_cmd = [ "ffmpeg", "-y", - "-i", temp_trimmed, - "-vf", "scale='min(720,iw)':-2", - "-c:v", "libx264", - "-preset", "veryfast", - "-crf", "30", - "-maxrate", "1800k", - "-bufsize", "3600k", - "-c:a", "aac", - "-b:a", "128k", - "-movflags", "+faststart", + "-i", + temp_trimmed, + "-vf", + "scale='min(720,iw)':-2", + "-c:v", + "libx264", + "-preset", + "veryfast", + "-crf", + "30", + "-maxrate", + "1800k", + "-bufsize", + "3600k", + "-c:a", + "aac", + "-b:a", + "128k", + "-movflags", + "+faststart", temp_output, ] compress_result = subprocess.run( - compress_cmd, capture_output=True, text=True, - timeout=SUBPROCESS_TIMEOUT_SECONDS + compress_cmd, + capture_output=True, + text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, ) if compress_result.returncode != 0: - logging.error(f"❌ ffmpeg compression failed:\n{compress_result.stderr}") + logging.error( + f"❌ ffmpeg compression failed:\n{compress_result.stderr}" + ) return None - if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: - logging.error("❌ Compressed video output is missing or empty.") + if ( + not os.path.exists(temp_output) + or os.path.getsize(temp_output) == 0 + ): + logging.error( + "❌ Compressed video output is missing or empty." + ) return None final_size_mb = os.path.getsize(temp_output) / (1024 * 1024) - logging.info(f"βœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)") + logging.info( + f"βœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)" + ) os.replace(temp_output, output_path) logging.info(f"βœ… Final video ready: {output_path}") return output_path except subprocess.TimeoutExpired: - logging.error(f"❌ ffmpeg subprocess timed out after {SUBPROCESS_TIMEOUT_SECONDS}s") + logging.error( + f"❌ ffmpeg subprocess timed out after {SUBPROCESS_TIMEOUT_SECONDS}s" + ) return None except Exception as e: @@ -2319,6 +2813,7 @@ def download_and_crop_video(video_url, output_path): remove_file_quietly(temp_output) +# --- Deduplication --- def candidate_matches_existing_bsky(candidate, recent_bsky_posts): candidate_non_x_urls = candidate["canonical_non_x_urls"] candidate_text_media_key = candidate["text_media_key"] @@ -2328,9 +2823,9 @@ def candidate_matches_existing_bsky(candidate, recent_bsky_posts): existing_non_x_urls = existing["canonical_non_x_urls"] if ( - candidate_non_x_urls and - candidate_non_x_urls == existing_non_x_urls and - candidate_normalized_text == existing["normalized_text"] + candidate_non_x_urls + and candidate_non_x_urls == existing_non_x_urls + and candidate_normalized_text == existing["normalized_text"] ): return True, "bsky:normalized_text_plus_non_x_urls" @@ -2343,6 +2838,7 @@ def candidate_matches_existing_bsky(candidate, recent_bsky_posts): return False, None +# --- Main Sync Logic --- def sync_feeds(args): logging.info("πŸ”„ Starting sync cycle...") @@ -2350,7 +2846,9 @@ def sync_feeds(args): bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS if dry_run: - logging.info("πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky.") + logging.info( + "πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky." + ) try: state = load_state(STATE_PATH) @@ -2359,11 +2857,13 @@ def sync_feeds(args): args.twitter_username, args.twitter_password, args.twitter_email, - args.twitter_handle + args.twitter_handle, ) if not tweets: - logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") + logging.warning( + "⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle." + ) return bsky_client = None @@ -2371,7 +2871,7 @@ def sync_feeds(args): bsky_client = create_bsky_client( args.bsky_base_url, args.bsky_handle, - args.bsky_password + args.bsky_password, ) recent_bsky_posts = [] @@ -2379,14 +2879,20 @@ def sync_feeds(args): recent_bsky_posts = get_recent_bsky_posts( bsky_client, args.bsky_handle, - limit=DEDUPE_BSKY_LIMIT + limit=DEDUPE_BSKY_LIMIT, ) - logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") - logging.info(f"🧠 Local state currently tracks {len(state.get('posted_tweets', {}))} posted items.") + logging.info( + f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection." + ) + logging.info( + f"🧠 Local state currently tracks {len(state.get('posted_tweets', {}))} posted items." + ) too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) - logging.info(f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}") + logging.info( + f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}" + ) candidate_tweets = [] @@ -2397,61 +2903,115 @@ def sync_feeds(args): tweet_time = arrow.get(tweet.created_on) if tweet_time < too_old_cutoff: - logging.info(f"⏭️ Skipping old tweet from {tweet_time}") + logging.info( + f"⏭️ Skipping old tweet from {tweet_time}" + ) continue # --- Retweet filtering --- if tweet.is_retweet: - logging.info(f"⏭️ Skipping retweet/repost: {tweet.tweet_url}") + logging.info( + f"⏭️ Skipping retweet/repost: {tweet.tweet_url}" + ) continue - canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url) - if canonical_tweet_url and canonical_tweet_url in state.get("posted_tweets", {}): - logging.info(f"⚑ Early skip due to known tweet URL in local state: {canonical_tweet_url}") + canonical_tweet_url = canonicalize_tweet_url( + tweet.tweet_url + ) + if canonical_tweet_url and canonical_tweet_url in state.get( + "posted_tweets", {} + ): + logging.info( + f"⚑ Early skip due to known tweet URL in local state: {canonical_tweet_url}" + ) continue scraped_text = clean_post_text(tweet.text or "") if not scraped_text and not tweet.media: - logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}") + logging.info( + f"⏭️ Skipping empty/blank tweet from {tweet_time}" + ) continue - cheap_candidates.append((tweet, tweet_time, canonical_tweet_url)) + cheap_candidates.append( + (tweet, tweet_time, canonical_tweet_url) + ) except Exception as e: - logging.warning(f"⚠️ Failed during cheap prefilter: {e}") + logging.warning( + f"⚠️ Failed during cheap prefilter: {e}" + ) - logging.info(f"⚑ {len(cheap_candidates)} tweets remain after cheap prefilter.") + logging.info( + f"⚑ {len(cheap_candidates)} tweets remain after cheap prefilter." + ) with httpx.Client() as resolve_http_client: - for tweet, tweet_time, canonical_tweet_url in cheap_candidates: + for ( + tweet, + tweet_time, + canonical_tweet_url, + ) in cheap_candidates: try: - full_clean_text, resolved_primary_external_url = build_effective_tweet_text(tweet, resolve_http_client) + ( + full_clean_text, + resolved_primary_external_url, + ) = build_effective_tweet_text( + tweet, resolve_http_client + ) normalized_text = normalize_post_text(full_clean_text) if not normalized_text and not tweet.media: - logging.info(f"⏭️ Skipping empty/blank tweet after enrichment from {tweet_time}") + logging.info( + f"⏭️ Skipping empty/blank tweet after enrichment from {tweet_time}" + ) continue - ordered_non_x_urls = extract_ordered_non_x_urls(full_clean_text) + ordered_non_x_urls = extract_ordered_non_x_urls( + full_clean_text + ) canonical_non_x_urls = set() if resolved_primary_external_url: - canonical_non_x_urls.add(canonicalize_url(resolved_primary_external_url)) + canonical_non_x_urls.add( + canonicalize_url( + resolved_primary_external_url + ) + ) for raw_url in ordered_non_x_urls: - if not is_tco_domain(raw_url) and not is_x_or_twitter_domain(raw_url): - canonical_non_x_urls.add(canonicalize_url(raw_url)) + if not is_tco_domain( + raw_url + ) and not is_x_or_twitter_domain(raw_url): + canonical_non_x_urls.add( + canonicalize_url(raw_url) + ) primary_non_x_url = None if resolved_primary_external_url: - primary_non_x_url = resolved_primary_external_url + primary_non_x_url = ( + resolved_primary_external_url + ) else: - primary_non_x_url = extract_first_visible_non_x_url(full_clean_text) - if not primary_non_x_url and ordered_non_x_urls: + primary_non_x_url = ( + extract_first_visible_non_x_url( + full_clean_text + ) + ) + if ( + not primary_non_x_url + and ordered_non_x_urls + ): primary_non_x_url = ordered_non_x_urls[0] - has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or [])) - has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or [])) + has_video = any( + getattr(m, "type", None) == "video" + for m in (tweet.media or []) + ) + has_photo = any( + getattr(m, "type", None) == "photo" + for m in (tweet.media or []) + ) raw_text = choose_final_visible_text( full_clean_text, @@ -2460,7 +3020,9 @@ def sync_feeds(args): ) media_fingerprint = build_media_fingerprint(tweet) - text_media_key = build_text_media_key(normalized_text, media_fingerprint) + text_media_key = build_text_media_key( + normalized_text, media_fingerprint + ) candidate = { "tweet": tweet, @@ -2475,30 +3037,48 @@ def sync_feeds(args): "ordered_non_x_urls": ordered_non_x_urls, "primary_non_x_url": primary_non_x_url, "resolved_primary_external_url": resolved_primary_external_url, - "looks_like_title_plus_url": looks_like_title_plus_url_post(full_clean_text), + "looks_like_title_plus_url": looks_like_title_plus_url_post( + full_clean_text + ), "has_video": has_video, "has_photo": has_photo, } - is_dup_state, reason_state = candidate_matches_state(candidate, state) + is_dup_state, reason_state = ( + candidate_matches_state(candidate, state) + ) if is_dup_state: - logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}") + logging.info( + f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}" + ) continue - is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) + is_dup_bsky, reason_bsky = ( + candidate_matches_existing_bsky( + candidate, recent_bsky_posts + ) + ) if is_dup_bsky: - logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") + logging.info( + f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}" + ) continue candidate_tweets.append(candidate) except Exception as e: - logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}") + logging.warning( + f"⚠️ Failed to prepare candidate tweet: {e}" + ) - logging.info(f"πŸ“¬ {len(candidate_tweets)} tweets remain after duplicate filtering.") + logging.info( + f"πŸ“¬ {len(candidate_tweets)} tweets remain after duplicate filtering." + ) if not candidate_tweets: - logging.info("βœ… No new tweets need posting after duplicate comparison.") + logging.info( + "βœ… No new tweets need posting after duplicate comparison." + ) return new_posts = 0 @@ -2507,7 +3087,9 @@ def sync_feeds(args): with sync_playwright() as p, httpx.Client() as media_http_client: browser = p.chromium.launch( headless=True, - args=["--disable-blink-features=AutomationControlled"] + args=[ + "--disable-blink-features=AutomationControlled" + ], ) context_kwargs = { "user_agent": ( @@ -2528,17 +3110,36 @@ def sync_feeds(args): raw_text = candidate["raw_text"] full_clean_text = candidate["full_clean_text"] - logging.info(f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} missing tweet from {tweet_time} to Bluesky...") + logging.info( + f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} " + f"missing tweet from {tweet_time} to Bluesky..." + ) if dry_run: - logging.info(f" πŸ“„ Text: {raw_text[:200]}{'...' if len(raw_text) > 200 else ''}") - logging.info(f" πŸ”— Primary external URL: {candidate.get('resolved_primary_external_url', 'None')}") - logging.info(f" πŸƒ Card URL: {getattr(tweet, 'card_url', 'None')}") - logging.info(f" 🎬 Has video: {candidate.get('has_video', False)}") - logging.info(f" πŸ–ΌοΈ Has photo: {candidate.get('has_photo', False)}") - logging.info(f" πŸ” Is retweet: {getattr(tweet, 'is_retweet', False)}") + logging.info( + f" πŸ“„ Text: {raw_text[:200]}{'...' if len(raw_text) > 200 else ''}" + ) + logging.info( + f" πŸ”— Primary external URL: {candidate.get('resolved_primary_external_url', 'None')}" + ) + logging.info( + f" πŸƒ Card URL: {getattr(tweet, 'card_url', 'None')}" + ) + logging.info( + f" 🎬 Has video: {candidate.get('has_video', False)}" + ) + logging.info( + f" πŸ–ΌοΈ Has photo: {candidate.get('has_photo', False)}" + ) + logging.info( + f" πŸ” Is retweet: {getattr(tweet, 'is_retweet', False)}" + ) - remember_posted_tweet(state, candidate, bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}") + remember_posted_tweet( + state, + candidate, + bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}", + ) save_state(state, STATE_PATH) new_posts += 1 continue @@ -2554,40 +3155,98 @@ def sync_feeds(args): has_video = candidate.get("has_video", False) if has_video: - video_media = next((m for m in (tweet.media or []) if getattr(m, "type", None) == "video"), None) + video_media = next( + ( + m + for m in (tweet.media or []) + if getattr(m, "type", None) == "video" + ), + None, + ) if video_media: if not tweet.tweet_url: - logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.") - media_upload_failures.append("video:no_tweet_url") + logging.warning( + "⚠️ Tweet has video marker but no tweet URL. Skipping video." + ) + media_upload_failures.append( + "video:no_tweet_url" + ) else: - temp_video_base = make_unique_video_temp_base(tweet.tweet_url) - temp_video_path = f"{temp_video_base}.mp4" + temp_video_base = ( + make_unique_video_temp_base( + tweet.tweet_url + ) + ) + temp_video_path = ( + f"{temp_video_base}.mp4" + ) try: - real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) + real_video_url = ( + extract_video_url_from_tweet_page( + context, tweet.tweet_url + ) + ) if not real_video_url: - logging.warning(f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}") - media_upload_failures.append(f"video:resolve_failed:{tweet.tweet_url}") + logging.warning( + f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:resolve_failed:{tweet.tweet_url}" + ) else: - cropped_video_path = download_and_crop_video(real_video_url, temp_video_path) + cropped_video_path = ( + download_and_crop_video( + real_video_url, + temp_video_path, + ) + ) if not cropped_video_path: - logging.warning(f"⚠️ Video download/crop failed for {tweet.tweet_url}") - media_upload_failures.append(f"video:crop_failed:{tweet.tweet_url}") + logging.warning( + f"⚠️ Video download/crop failed for {tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:crop_failed:{tweet.tweet_url}" + ) else: - video_blob = get_blob_from_file(cropped_video_path, bsky_client) + video_blob = ( + get_blob_from_file( + cropped_video_path, + bsky_client, + ) + ) if not video_blob: - logging.warning(f"⚠️ Video upload blob failed for {tweet.tweet_url}") - media_upload_failures.append(f"video:upload_failed:{tweet.tweet_url}") + logging.warning( + f"⚠️ Video upload blob failed for {tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:upload_failed:{tweet.tweet_url}" + ) else: - video_embed = build_video_embed(video_blob, dynamic_alt) + video_embed = ( + build_video_embed( + video_blob, + dynamic_alt, + ) + ) if not video_embed: - media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}") + media_upload_failures.append( + f"video:embed_failed:{tweet.tweet_url}" + ) finally: - remove_file_quietly(temp_video_path) - remove_file_quietly(f"{temp_video_base}_source.mp4") - remove_file_quietly(f"{temp_video_base}_trimmed.mp4") - remove_file_quietly(f"{temp_video_base}_compressed.mp4") + remove_file_quietly( + temp_video_path + ) + remove_file_quietly( + f"{temp_video_base}_source.mp4" + ) + remove_file_quietly( + f"{temp_video_base}_trimmed.mp4" + ) + remove_file_quietly( + f"{temp_video_base}_compressed.mp4" + ) if not video_embed: logging.warning( @@ -2599,38 +3258,55 @@ def sync_feeds(args): if tweet.media: for media in tweet.media: if media.type == "photo": - blob = get_blob_from_url(media.media_url_https, bsky_client, media_http_client) + blob = get_blob_from_url( + media.media_url_https, + bsky_client, + media_http_client, + ) if blob: image_embeds.append( models.AppBskyEmbedImages.Image( alt=dynamic_alt, - image=blob + image=blob, ) ) else: - media_upload_failures.append(f"photo:{media.media_url_https}") + media_upload_failures.append( + f"photo:{media.media_url_https}" + ) - # --- External link card logic (KEY FIX for t.co card URLs) --- + # --- External link card logic --- if not video_embed and not image_embeds: - candidate_url = candidate.get("resolved_primary_external_url") + candidate_url = candidate.get( + "resolved_primary_external_url" + ) if candidate_url: if candidate.get("looks_like_title_plus_url"): - logging.info(f"πŸ”— Detected title+URL post style. Using resolved URL for external card: {candidate_url}") + logging.info( + f"πŸ”— Detected title+URL post style. " + f"Using resolved URL for external card: {candidate_url}" + ) else: - logging.info(f"πŸ”— Using resolved first external URL for external card: {candidate_url}") + logging.info( + f"πŸ”— Using resolved first external URL for external card: {candidate_url}" + ) external_embed = build_external_link_embed( candidate_url, bsky_client, media_http_client, - fallback_title="Link" + fallback_title="Link", ) if external_embed: - logging.info(f"βœ… Built external link card for URL: {candidate_url}") + logging.info( + f"βœ… Built external link card for URL: {candidate_url}" + ) else: - logging.info(f"ℹ️ Could not build external link card metadata for URL: {candidate_url}") + logging.info( + f"ℹ️ Could not build external link card metadata for URL: {candidate_url}" + ) try: post_result = None @@ -2638,42 +3314,70 @@ def sync_feeds(args): if video_embed: post_result = send_post_with_retry( - bsky_client, text=rich_text, embed=video_embed, langs=bsky_langs + bsky_client, + text=rich_text, + embed=video_embed, + langs=bsky_langs, ) post_mode = "video" elif image_embeds: - embed = models.AppBskyEmbedImages.Main(images=image_embeds) + embed = models.AppBskyEmbedImages.Main( + images=image_embeds + ) post_result = send_post_with_retry( - bsky_client, text=rich_text, embed=embed, langs=bsky_langs + bsky_client, + text=rich_text, + embed=embed, + langs=bsky_langs, ) post_mode = f"images:{len(image_embeds)}" elif external_embed: post_result = send_post_with_retry( - bsky_client, text=rich_text, embed=external_embed, langs=bsky_langs + bsky_client, + text=rich_text, + embed=external_embed, + langs=bsky_langs, ) post_mode = "external_link_card" else: post_result = send_post_with_retry( - bsky_client, text=rich_text, langs=bsky_langs + bsky_client, + text=rich_text, + langs=bsky_langs, ) post_mode = "text_only" bsky_uri = getattr(post_result, "uri", None) - remember_posted_tweet(state, candidate, bsky_uri=bsky_uri) + remember_posted_tweet( + state, candidate, bsky_uri=bsky_uri + ) state = prune_state(state, max_entries=5000) save_state(state, STATE_PATH) - recent_bsky_posts.insert(0, { - "uri": bsky_uri, - "text": raw_text, - "normalized_text": candidate["normalized_text"], - "canonical_non_x_urls": candidate["canonical_non_x_urls"], - "media_fingerprint": candidate["media_fingerprint"], - "text_media_key": candidate["text_media_key"], - "created_at": arrow.utcnow().isoformat(), - }) - recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] + recent_bsky_posts.insert( + 0, + { + "uri": bsky_uri, + "text": raw_text, + "normalized_text": candidate[ + "normalized_text" + ], + "canonical_non_x_urls": candidate[ + "canonical_non_x_urls" + ], + "media_fingerprint": candidate[ + "media_fingerprint" + ], + "text_media_key": candidate[ + "text_media_key" + ], + "created_at": arrow.utcnow().isoformat(), + }, + ) + recent_bsky_posts = recent_bsky_posts[ + :DEDUPE_BSKY_LIMIT + ] new_posts += 1 @@ -2683,16 +3387,22 @@ def sync_feeds(args): f"Failed media items: {media_upload_failures}" ) else: - logging.info(f"βœ… Posted new tweet to Bluesky with mode {post_mode}: {raw_text}") + logging.info( + f"βœ… Posted new tweet to Bluesky with mode {post_mode}: {raw_text}" + ) time.sleep(5) except Exception as e: - logging.error(f"❌ Failed to post tweet to Bluesky: {e}") + logging.error( + f"❌ Failed to post tweet to Bluesky: {e}" + ) browser.close() - logging.info(f"βœ… Sync complete. Posted {new_posts} new updates.") + logging.info( + f"βœ… Sync complete. Posted {new_posts} new updates." + ) except Exception as e: logging.error(f"❌ Error during sync cycle: {e}") @@ -2701,14 +3411,32 @@ def sync_feeds(args): def main(): load_dotenv() - parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") - parser.add_argument("--twitter-username", help="Your Twitter login username") - parser.add_argument("--twitter-password", help="Your Twitter login password") - parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") - parser.add_argument("--twitter-handle", help="The Twitter account to scrape") - parser.add_argument("--bsky-handle", help="Your Bluesky handle") - parser.add_argument("--bsky-password", help="Your Bluesky app password") - parser.add_argument("--bsky-base-url", help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social") + parser = argparse.ArgumentParser( + description="Twitter to Bluesky Sync" + ) + parser.add_argument( + "--twitter-username", help="Your Twitter login username" + ) + parser.add_argument( + "--twitter-password", help="Your Twitter login password" + ) + parser.add_argument( + "--twitter-email", + help="Your Twitter email for security challenges", + ) + parser.add_argument( + "--twitter-handle", help="The Twitter account to scrape" + ) + parser.add_argument( + "--bsky-handle", help="Your Bluesky handle" + ) + parser.add_argument( + "--bsky-password", help="Your Bluesky app password" + ) + parser.add_argument( + "--bsky-base-url", + help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social", + ) parser.add_argument( "--bsky-langs", help="Comma-separated language codes for Bluesky posts (default: ca)", @@ -2723,22 +3451,44 @@ def main(): args = parser.parse_args() - args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") - args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") + args.twitter_username = args.twitter_username or os.getenv( + "TWITTER_USERNAME" + ) + args.twitter_password = args.twitter_password or os.getenv( + "TWITTER_PASSWORD" + ) args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") - args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") - args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username - args.bsky_base_url = args.bsky_base_url if args.bsky_base_url else DEFAULT_BSKY_BASE_URL + args.bsky_password = args.bsky_password or os.getenv( + "BSKY_APP_PASSWORD" + ) + args.twitter_handle = ( + args.twitter_handle + or os.getenv("TWITTER_HANDLE") + or args.twitter_username + ) + args.bsky_base_url = ( + args.bsky_base_url + if args.bsky_base_url + else DEFAULT_BSKY_BASE_URL + ) # --- Language handling: CLI > env > default (Catalan) --- raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") if raw_langs: - args.bsky_langs = [lang.strip() for lang in raw_langs.split(",") if lang.strip()] - logging.info(f"🌍 Using configured Bluesky languages: {args.bsky_langs}") + args.bsky_langs = [ + lang.strip() + for lang in raw_langs.split(",") + if lang.strip() + ] + logging.info( + f"🌍 Using configured Bluesky languages: {args.bsky_langs}" + ) else: args.bsky_langs = DEFAULT_BSKY_LANGS - logging.info(f"🌍 Using default Bluesky languages: {args.bsky_langs}") + logging.info( + f"🌍 Using default Bluesky languages: {args.bsky_langs}" + ) missing_args = [] if not args.twitter_username: @@ -2751,14 +3501,22 @@ def main(): missing_args.append("--bsky-password") if missing_args: - logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}") + logging.error( + f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}" + ) return - logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") - logging.info(f"🌍 Posting destination base URL: {args.bsky_base_url}") + logging.info( + f"πŸ€– Bot started. Will check @{args.twitter_handle}" + ) + logging.info( + f"🌍 Posting destination base URL: {args.bsky_base_url}" + ) if args.dry_run: - logging.info("πŸ§ͺ DRY RUN MODE ENABLED β€” no posts will be created.") + logging.info( + "πŸ§ͺ DRY RUN MODE ENABLED β€” no posts will be created." + ) reset_caches() sync_feeds(args)