From b23501583215c3f76f8e6887fcfb77ed84a24136 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Fri, 8 May 2026 16:47:03 +0000 Subject: [PATCH] Added ffpmeg --- twitter2bsky_daemon.py | 4159 +++++++--------------------------------- 1 file changed, 640 insertions(+), 3519 deletions(-) diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index 061f83d..0c58522 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -1,1378 +1,63 @@ +#!/usr/bin/env python3 +""" +bsky_post.py β€” Post text + optional image or video to Bluesky/federated PDS. + +Includes: +- Robust login backoff +- Reliable video upload via video.bsky.app +- Correct service auth (aud + lxm) +- 409 already_exists handling (reuse jobId) +- SDK compatibility for blob refs +- Explicit record creation (guaranteed text field) +- ffmpeg video compression (enabled by default) +""" + import argparse -import arrow -import hashlib -import html -import io -import json import logging -import re -import httpx -import time +import mimetypes import os -import subprocess -import uuid import random +import re +import secrets +import shutil +import string +import subprocess +import sys +import tempfile +import time +from dataclasses import dataclass from urllib.parse import urlparse -from dotenv import load_dotenv -from atproto import Client, client_utils, models -from playwright.sync_api import sync_playwright -from moviepy import VideoFileClip -from bs4 import BeautifulSoup -from PIL import Image -import grapheme # add to imports at top -# --- Configuration --- -LOG_PATH = "twitter2bsky.log" -STATE_PATH = "twitter2bsky_state.json" -SCRAPE_TWEET_LIMIT = 30 -DEDUPE_BSKY_LIMIT = 30 -TWEET_MAX_AGE_DAYS = 3 -BSKY_TEXT_MAX_LENGTH = 300 -DEFAULT_BSKY_LANGS = ["ca"] - -VIDEO_MAX_DURATION_SECONDS = 179 -MAX_VIDEO_UPLOAD_SIZE_MB = 45 - -BSKY_IMAGE_MAX_BYTES = 950 * 1024 -BSKY_IMAGE_MAX_DIMENSION = 2000 -BSKY_IMAGE_MIN_JPEG_QUALITY = 45 - -EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 -EXTERNAL_THUMB_MAX_DIMENSION = 1200 -EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 - -BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 -BSKY_BLOB_UPLOAD_BASE_DELAY = 10 -BSKY_BLOB_UPLOAD_MAX_DELAY = 300 -BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 -BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 - -BSKY_SEND_POST_MAX_RETRIES = 3 -BSKY_SEND_POST_BASE_DELAY = 5 -BSKY_SEND_POST_MAX_DELAY = 60 - -# --- Login hardening (NEW) --- -BSKY_LOGIN_MAX_RETRIES = 4 -BSKY_LOGIN_BASE_DELAY = 10 -BSKY_LOGIN_MAX_DELAY = 600 -BSKY_LOGIN_JITTER_MAX = 1.5 - -MEDIA_DOWNLOAD_TIMEOUT = 30 -LINK_METADATA_TIMEOUT = 10 -URL_RESOLVE_TIMEOUT = 12 -PLAYWRIGHT_RESOLVE_TIMEOUT_MS = 60000 -SUBPROCESS_TIMEOUT_SECONDS = 180 -FFPROBE_TIMEOUT_SECONDS = 15 -DEFAULT_BSKY_BASE_URL = "https://bsky.social" - -OG_TITLE_WAIT_TIMEOUT_MS = 7000 -PLAYWRIGHT_POST_GOTO_SLEEP_S = 2.0 -PLAYWRIGHT_IDLE_POLL_SLEEP_S = 0.8 -PLAYWRIGHT_IDLE_POLL_ROUNDS = 4 -PLAYWRIGHT_RETRY_SLEEP_S = 2.0 -VIDEO_PLAYER_WAIT_ROUNDS = 8 -VIDEO_PLAYER_RETRY_ROUNDS = 5 -URL_TAIL_MIN_PREFIX_CHARS = 35 -URL_TAIL_MAX_LOOKBACK_CHARS = 120 -URL_TAIL_MAX_CLAUSE_DISTANCE = 180 -DYNAMIC_ALT_MAX_LENGTH = 150 -TRUNCATE_MIN_PREFIX_CHARS = 20 -SHORT_TWEET_OG_FETCH_THRESHOLD = 35 -ORPHAN_DIGIT_MAX_DIGITS = 3 -SESSION_FILE_PERMISSIONS = 0o600 - -# --- Logging Setup --- -logging.basicConfig( - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[ - logging.FileHandler(LOG_PATH, encoding="utf-8"), - logging.StreamHandler(), - ], - level=logging.INFO, -) - -# --- Per-run caches --- -class _RunCache: - def __init__(self): - self.og_title: dict = {} - self.url_resolution: dict = {} - self.url_validity: dict = {} - self.locale: str = "en-US" # ← ADDED locale cache here - - def clear(self): - self.og_title.clear() - self.url_resolution.clear() - self.url_validity.clear() - -_cache = _RunCache() - - -def reset_caches(): - _cache.clear() - -def grapheme_len(text): - """Return the grapheme cluster count, matching Bluesky's character counting.""" - return grapheme.length(text) - -# BCP-47 language tag β†’ sensible locale for Playwright -_LANG_TO_LOCALE = { - "ca": "ca-ES", - "es": "es-ES", - "en": "en-US", - "fr": "fr-FR", - "de": "de-DE", - "pt": "pt-PT", - "it": "it-IT", - "nl": "nl-NL", - "eu": "eu-ES", - "gl": "gl-ES", -} - -def bsky_langs_to_playwright_locale(bsky_langs): - """ - Convert the first configured Bluesky language tag to a Playwright locale - string (e.g. ['ca'] β†’ 'ca-ES'). Falls back to 'en-US' if unknown. - """ - if not bsky_langs: - return "en-US" - primary = bsky_langs[0].strip().lower() - return _LANG_TO_LOCALE.get(primary, f"{primary}-{primary.upper()}") - -# --- Custom Classes --- -class ScrapedMedia: - def __init__(self, url, media_type="photo"): - self.type = media_type - self.media_url_https = url - - -class ScrapedTweet: - def __init__(self, created_on, text, media_urls, tweet_url=None, card_url=None, is_retweet=False): - self.created_on = created_on - self.text = text - self.tweet_url = tweet_url - self.card_url = card_url - self.is_retweet = is_retweet - self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] - - -# --- Helpers --- -def take_error_screenshot(page, error_msg): - logging.info(f"πŸ“Έ Taking screenshot... Shot: {error_msg}") - timestamp = time.strftime("%Y%m%d_%H%M%S") - screenshot_name = f"screenshot_{timestamp}.png" - page.screenshot(path=screenshot_name) - logging.info(f"πŸ“Έ Screenshot saved as: {screenshot_name}") - - -def is_valid_url(url): - if url in _cache.url_validity: - return _cache.url_validity[url] - - try: - response = httpx.head(url, timeout=5, follow_redirects=True) - result = response.status_code < 500 - except Exception: - result = False - - _cache.url_validity[url] = result - return result - - -def strip_trailing_url_punctuation(url): - if not url: - return url - # Strip a trailing hashtag-style fragment (#Word) that is really a social - # hashtag glued to the end of a URL with no space, e.g. - # https://cit.transit.gencat.cat#SCT β†’ https://cit.transit.gencat.cat - # Only stripped when it starts with a letter so real anchors like - # /page#section-2 inside a longer sentence are left alone. - url = re.sub(r"#[A-Za-z]\w*$", "", url.strip()) - return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url) - - -def split_url_hashtag_suffix(text): - """ - Split a URL that has a hashtag fragment glued to it with no space, e.g.: - 'https://cit.transit.gencat.cat#SCT' - becomes: - 'https://cit.transit.gencat.cat #SCT' - - Only splits when the fragment looks like a social hashtag: starts with # - followed by a letter then word characters. The lookahead (?=\\s|$) ensures - we only act at a word boundary so mid-sentence anchors followed by more - URL path are left untouched. - """ - if not text: - return text - - fixed = re.sub( - r"(https?://[^\s#<>\"']+)(#[A-Za-z]\w*)(?=\s|$)", - r"\1 \2", - text, - ) - if fixed != text: - logging.info("πŸ”§ Split hashtag suffix from URL in text") - return fixed - - -def split_concatenated_urls(text): - if not text: - return text - - fixed = re.sub(r"(https?://[^\s]+?)(https?://)", r"\1 \2", text) - if fixed != text: - logging.info("πŸ”§ Split concatenated URLs in text") - return fixed - - -def repair_broken_urls(text): - if not text: - return text - - original = text - text = split_concatenated_urls(text) - # Split glued hashtag suffixes before any rejoining passes - text = split_url_hashtag_suffix(text) - - text = re.sub(r"(https?://)\s*[\r\n]+\s*", r"\1", text, flags=re.IGNORECASE) - - prev_text = None - while prev_text != text: - prev_text = text - text = re.sub( - r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", - r"\1\2", - text, - flags=re.IGNORECASE, - ) - - text = re.sub( - r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", - r"\1\2", - text, - flags=re.IGNORECASE, - ) - - text = split_concatenated_urls(text) - # Run hashtag split again after rejoining passes β€” the rejoining regex - # contains # in its character class so it can re-glue a fragment. - text = split_url_hashtag_suffix(text) - - if text != original: - logging.info("πŸ”§ Repaired broken URL wrapping in scraped text") - - return text - - -def repair_broken_mentions(text): - if not text: - return text - - lines = text.splitlines() - result = [] - i = 0 - changed = False - - def is_mention_only_line(s): - return bool(re.fullmatch(r"@[A-Za-z0-9_]+", s.strip())) - - def is_blank_line(s): - return not s.strip() - - while i < len(lines): - current = lines[i] - stripped = current.strip() - - if is_blank_line(current): - result.append("") - i += 1 - continue - - if is_mention_only_line(current): - if result and result[-1].strip(): - result[-1] = result[-1].rstrip() + " " + stripped - changed = True - else: - result.append(stripped) - - i += 1 - - while i < len(lines): - next_line = lines[i] - next_stripped = next_line.strip() - - if is_blank_line(next_line): - break - if is_mention_only_line(next_line): - break - - result[-1] = result[-1].rstrip() + " " + next_stripped - changed = True - i += 1 - - if i < len(lines) and is_blank_line(lines[i]): - break - - continue - - if i + 1 < len(lines) and is_mention_only_line(lines[i + 1]): - merged = stripped + " " + lines[i + 1].strip() - changed = True - i += 2 - - while i < len(lines): - next_line = lines[i] - next_stripped = next_line.strip() - - if is_blank_line(next_line): - break - if is_mention_only_line(next_line): - break - - merged = merged.rstrip() + " " + next_stripped - changed = True - i += 1 - - if i < len(lines) and is_blank_line(lines[i]): - break - - result.append(merged) - continue - - result.append(stripped) - i += 1 - - new_text = "\n".join(result) - - if changed: - logging.info("πŸ”§ Repaired broken mention wrapping in scraped text") - - return new_text - - -def strip_line_edge_whitespace(text): - if not text: - return text - - lines = text.splitlines() - cleaned_lines = [] - changed = False - - for line in lines: - cleaned = line.strip() - if cleaned != line: - changed = True - cleaned_lines.append(cleaned) - - new_text = "\n".join(cleaned_lines) - - if changed: - logging.info("πŸ”§ Stripped leading/trailing whitespace from scraped text lines") - - return new_text - - -def remove_trailing_ellipsis_line(text): - if not text: - return text - - lines = text.splitlines() - - while lines and lines[-1].strip() in {"...", "…"}: - lines.pop() - - return "\n".join(lines).strip() - - -def remove_orphaned_digit_lines_before_hashtags(text): - if not text: - return text - - lines = text.splitlines() - if len(lines) < 2: - return text - - result = [] - changed = False - i = 0 - orphan_pattern = re.compile(rf"\d{{1,{ORPHAN_DIGIT_MAX_DIGITS}}}") - - while i < len(lines): - stripped = lines[i].strip() - - if ( - stripped - and orphan_pattern.fullmatch(stripped) - and i + 1 < len(lines) - and lines[i + 1].strip().startswith("#") - ): - logging.info(f"πŸ”§ Removing orphaned digit line '{stripped}' before hashtag line") - changed = True - i += 1 - continue - - result.append(lines[i]) - i += 1 - - if changed: - return "\n".join(result) - - return text - - -def clean_post_text(text): - raw_text = (text or "").strip() - raw_text = repair_broken_urls(raw_text) - raw_text = repair_broken_mentions(raw_text) - raw_text = strip_line_edge_whitespace(raw_text) - raw_text = remove_trailing_ellipsis_line(raw_text) - raw_text = remove_orphaned_digit_lines_before_hashtags(raw_text) - return raw_text.strip() - - -def clean_url(url): - trimmed_url = url.strip() - cleaned_url = re.sub(r"\s+", "", trimmed_url) - cleaned_url = strip_trailing_url_punctuation(cleaned_url) - - if is_valid_url(cleaned_url): - return cleaned_url - return None - - -def canonicalize_url(url): - if not url: - return None - return strip_trailing_url_punctuation(url.strip()) - - -def normalize_urlish_token(token): - if not token: - return None - - token = strip_trailing_url_punctuation(token.strip()) - if not token: - return None - - if token.startswith(("http://", "https://")): - return token - - if token.startswith("www."): - return f"https://{token}" - - return None - - -def canonicalize_tweet_url(url): - if not url: - return None - - url = url.strip() - match = re.search( - r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", - url, - re.IGNORECASE, - ) - if not match: - return url.lower() - - handle = match.group(1).lower() - tweet_id = match.group(2) - return f"https://x.com/{handle}/status/{tweet_id}" - - -def extract_tweet_id(tweet_url): - if not tweet_url: - return None - match = re.search(r"/status/(\d+)", tweet_url) - if match: - return match.group(1) - return None - - -def make_unique_video_temp_base(tweet_url=None): - tweet_id = extract_tweet_id(tweet_url) or "unknown" - ts_ms = int(time.time() * 1000) - rand = uuid.uuid4().hex[:8] - base = f"temp_video_{tweet_id}_{ts_ms}_{rand}" - logging.info(f"🎞️ Using unique temp video base: {base}") - return base - - -def remove_file_quietly(path): - if path and os.path.exists(path): - try: - os.remove(path) - logging.info(f"🧹 Removed temp file: {path}") - except Exception as e: - logging.warning(f"⚠️ Could not remove temp file {path}: {e}") - - -def is_x_or_twitter_domain(url): - try: - normalized = normalize_urlish_token(url) or url - hostname = (urlparse(normalized).hostname or "").lower() - return hostname in { - "x.com", - "www.x.com", - "twitter.com", - "www.twitter.com", - "mobile.twitter.com", - } - except Exception: - return False - - -def is_tco_domain(url): - try: - normalized = normalize_urlish_token(url) or url - hostname = (urlparse(normalized).hostname or "").lower() - return hostname == "t.co" - except Exception: - return False - - -def is_external_non_x_url(url): - if not url: - return False - return (not is_tco_domain(url)) and (not is_x_or_twitter_domain(url)) - - -def extract_urls_from_text(text): - if not text: - return [] - - repaired = repair_broken_urls(text) - pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' - return re.findall(pattern, repaired) - - -def extract_quoted_text_from_og_title(og_title): - if not og_title: - return None - - decoded = html.unescape(og_title).strip() - match = re.search(r'on X:\s*"(?P.*)"\s*/\s*X\s*$', decoded, flags=re.DOTALL) - if match: - extracted = match.group("text").strip() - if extracted: - return extracted - - first_quote = decoded.find('"') - last_quote = decoded.rfind('"') - if 0 <= first_quote < last_quote: - extracted = decoded[first_quote + 1 : last_quote].strip() - if extracted: - return extracted - - return None - - -def should_fetch_og_title(tweet): - text = clean_post_text(tweet.text or "") - urls = extract_urls_from_text(text) - - if not text: - return True - - if any(is_tco_domain(normalize_urlish_token(u) or u) for u in urls): - return True - - if "…" in text or text.endswith("..."): - return True - - if len(text) < SHORT_TWEET_OG_FETCH_THRESHOLD: - return True - - return False - - -def fetch_tweet_og_title_text(tweet_url, locale="en-US"): - if not tweet_url: - return None - - if tweet_url in _cache.og_title: - logging.info(f"⚑ Using cached og:title text for {tweet_url}") - return _cache.og_title[tweet_url] - - browser = None - browser_context = None - page = None - - try: - logging.info(f"🧾 Fetching og:title from tweet page: {tweet_url}") - - with sync_playwright() as p: - browser = p.chromium.launch( - headless=True, - args=["--disable-blink-features=AutomationControlled"], - ) - browser_context = browser.new_context( - user_agent=( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" - ), - viewport={"width": 1280, "height": 900}, - locale=_cache.locale, # ← USE CACHE - ) - page = browser_context.new_page() - page.goto( - tweet_url, - wait_until="domcontentloaded", - timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, - ) - - try: - page.wait_for_selector( - 'meta[property="og:title"]', - timeout=OG_TITLE_WAIT_TIMEOUT_MS, - ) - except Exception: - pass - - og_title = ( - page.locator('meta[property="og:title"]') - .first.get_attribute("content") - ) - extracted = extract_quoted_text_from_og_title(og_title) - - if extracted: - extracted = clean_post_text(extracted) - _cache.og_title[tweet_url] = extracted - logging.info(f"βœ… Extracted tweet text from og:title for {tweet_url}") - return extracted - - logging.info(f"ℹ️ No usable og:title text extracted for {tweet_url}") - _cache.og_title[tweet_url] = None - return None - - except Exception as e: - logging.warning( - f"⚠️ Could not extract og:title text from {tweet_url}: {repr(e)}" - ) - try: - if page: - take_error_screenshot(page, "tweet_og_title_failed") - except Exception: - pass - _cache.og_title[tweet_url] = None - return None - finally: - try: - if page: - page.close() - except Exception: - pass - try: - if browser_context: - browser_context.close() - except Exception: - pass - try: - if browser: - browser.close() - except Exception: - pass - - -def resolve_tco_with_httpx(url, http_client): - try: - response = http_client.get( - url, timeout=URL_RESOLVE_TIMEOUT, follow_redirects=True - ) - final_url = canonicalize_url(str(response.url)) - if final_url: - logging.info(f"πŸ”— Resolved t.co with httpx: {url} -> {final_url}") - return final_url - except Exception as e: - logging.warning(f"⚠️ httpx t.co resolution failed for {url}: {repr(e)}") - - return canonicalize_url(url) - - -def resolve_tco_with_playwright(url, locale="en-US"): - browser = None - browser_context = None - page = None - - try: - logging.info(f"🌐 Resolving t.co with Playwright: {url}") - - with sync_playwright() as p: - browser = p.chromium.launch( - headless=True, - args=["--disable-blink-features=AutomationControlled"], - ) - browser_context = browser.new_context( - user_agent=( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" - ), - viewport={"width": 1280, "height": 900}, - locale=locale, - ) - page = browser_context.new_page() - - try: - page.goto( - url, - wait_until="domcontentloaded", - timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, - ) - except Exception as e: - logging.warning( - f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}" - ) - - time.sleep(PLAYWRIGHT_POST_GOTO_SLEEP_S) - final_url = canonicalize_url(page.url) - - for _ in range(PLAYWRIGHT_IDLE_POLL_ROUNDS): - if final_url and is_external_non_x_url(final_url): - break - - try: - page.wait_for_load_state("networkidle", timeout=2000) - except Exception: - pass - - time.sleep(PLAYWRIGHT_IDLE_POLL_SLEEP_S) - final_url = canonicalize_url(page.url) - - logging.info(f"🌐 Playwright final URL for {url}: {final_url}") - return final_url - - except Exception as e: - logging.warning( - f"⚠️ Playwright t.co resolution failed for {url}: {repr(e)}" - ) - try: - if page: - take_error_screenshot(page, "tco_resolve_failed") - except Exception: - pass - finally: - try: - if page: - page.close() - except Exception: - pass - try: - if browser_context: - browser_context.close() - except Exception: - pass - try: - if browser: - browser.close() - except Exception: - pass - - return canonicalize_url(url) - - -def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True): - if not url: - return None - - normalized = normalize_urlish_token(url) or url - cleaned = canonicalize_url(normalized) - if not cleaned: - return None - - if cleaned in _cache.url_resolution: - logging.info( - f"⚑ Using cached URL resolution: {cleaned} -> {_cache.url_resolution[cleaned]}" - ) - return _cache.url_resolution[cleaned] - - if not is_tco_domain(cleaned): - _cache.url_resolution[cleaned] = cleaned - return cleaned - - resolved_http = resolve_tco_with_httpx(cleaned, http_client) - if is_external_non_x_url(resolved_http): - _cache.url_resolution[cleaned] = resolved_http - return resolved_http - - if not allow_playwright_fallback: - _cache.url_resolution[cleaned] = resolved_http - return resolved_http - - resolved_browser = resolve_tco_with_playwright(cleaned) - if is_external_non_x_url(resolved_browser): - logging.info( - f"βœ… Resolved t.co via Playwright to external URL: {resolved_browser}" - ) - _cache.url_resolution[cleaned] = resolved_browser - return resolved_browser - - if resolved_http and not is_tco_domain(resolved_http): - _cache.url_resolution[cleaned] = resolved_http - return resolved_http - - _cache.url_resolution[cleaned] = cleaned - return cleaned - - -def extract_non_x_urls_from_text(text): - urls = extract_urls_from_text(text) - result = [] - - for url in urls: - normalized = normalize_urlish_token(url) - cleaned = strip_trailing_url_punctuation(normalized or url) - if not cleaned: - continue - - if is_tco_domain(cleaned): - result.append(cleaned) - continue - - if not is_x_or_twitter_domain(cleaned): - result.append(cleaned) - - return result - - -def extract_ordered_non_x_urls(text): - seen = set() - ordered = [] - - for url in extract_non_x_urls_from_text(text): - canonical = canonicalize_url(url) - if canonical and canonical not in seen: - seen.add(canonical) - ordered.append(canonical) - - return ordered - - -def extract_first_visible_non_x_url(text): - for url in extract_non_x_urls_from_text(text or ""): - canonical = canonicalize_url(url) - if canonical: - return canonical - return None - - -def extract_first_resolved_external_url( - text, http_client, allow_playwright_fallback=True -): - for url in extract_non_x_urls_from_text(text or ""): - resolved = resolve_url_if_needed( - url, - http_client, - allow_playwright_fallback=allow_playwright_fallback, - ) - if not resolved: - continue - - if is_external_non_x_url(resolved): - logging.info(f"βœ… Selected resolved external URL for card: {resolved}") - return resolved - - return None - - -def resolve_card_url(card_url, http_client): - if not card_url: - return None - - cleaned = canonicalize_url(card_url.strip()) - if not cleaned: - return None - - if is_external_non_x_url(cleaned): - logging.info(f"πŸ”— Card URL is already external: {cleaned}") - return cleaned - - if is_tco_domain(cleaned): - resolved = resolve_url_if_needed( - cleaned, http_client, allow_playwright_fallback=True - ) - if resolved and is_external_non_x_url(resolved): - logging.info(f"πŸ”— Resolved card t.co URL: {cleaned} -> {resolved}") - return resolved - - if is_x_or_twitter_domain(cleaned): - logging.info( - f"ℹ️ Card URL resolves to X/Twitter domain, ignoring: {cleaned}" - ) - return None - - return cleaned - - -def sanitize_visible_urls_in_text(text, http_client, has_media=False): - if not text: - return text, None - - working = clean_post_text(text) - url_pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' - urls = re.findall(url_pattern, working) - - if not urls: - return working, None - - replacements = {} - first_external_resolved = None - - for raw_url in urls: - normalized = normalize_urlish_token(raw_url) - cleaned = canonicalize_url(normalized or raw_url) - if not cleaned: - continue - - if is_x_or_twitter_domain(cleaned): - replacements[raw_url] = "" - logging.info(f"🧹 Removing X/Twitter URL from visible text: {cleaned}") - continue - - final_url = cleaned - if is_tco_domain(cleaned): - resolved_http_first = resolve_tco_with_httpx(cleaned, http_client) - - if is_external_non_x_url(resolved_http_first): - final_url = resolved_http_first - _cache.url_resolution[cleaned] = final_url - else: - if ( - has_media - and resolved_http_first - and is_x_or_twitter_domain(resolved_http_first) - ): - final_url = resolved_http_first - _cache.url_resolution[cleaned] = final_url - logging.info( - f"⚑ Skipping Playwright t.co fallback because tweet has media " - f"and httpx already resolved to X/Twitter URL: {final_url}" - ) - else: - final_url = resolve_url_if_needed( - cleaned, http_client, allow_playwright_fallback=True - ) - - if is_x_or_twitter_domain(final_url): - replacements[raw_url] = "" - logging.info( - f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}" - ) - continue - - if normalized and normalized.startswith("https://www."): - final_url = normalized - elif normalized and normalized.startswith("http://www."): - final_url = normalized - - if is_external_non_x_url(final_url) and not first_external_resolved: - first_external_resolved = final_url - - replacements[raw_url] = final_url - - def replace_match(match): - raw = match.group(0) - return replacements.get(raw, raw) - - working = re.sub(url_pattern, replace_match, working) - - deduped_lines = [] - for line in working.splitlines(): - line_urls = re.findall(url_pattern, line) - if len(line_urls) > 1: - prefix = re.sub(url_pattern, "", line).strip() - kept_urls = [] - seen_in_line: set = set() - - for url in line_urls: - normalized = normalize_urlish_token(url) or url - canonical = canonicalize_url(normalized) - - if not canonical: - continue - if is_x_or_twitter_domain(canonical): - continue - if canonical in seen_in_line: - continue - - seen_in_line.add(canonical) - kept_urls.append(url) - - if prefix and kept_urls: - rebuilt = prefix + " " + " ".join(kept_urls) - elif prefix: - rebuilt = prefix - else: - rebuilt = " ".join(kept_urls) - - deduped_lines.append(rebuilt.strip()) - else: - cleaned_line = re.sub(r"\s{2,}", " ", line).strip() - deduped_lines.append(cleaned_line) - - working = "\n".join(deduped_lines) - working = re.sub(r"[ \t]+", " ", working) - working = re.sub(r"\n{3,}", "\n\n", working).strip() - - return working, first_external_resolved - - -def build_effective_tweet_text(tweet, http_client): - scraped_text = clean_post_text(tweet.text or "") - has_media = bool(tweet.media) - og_title_text = None - - if should_fetch_og_title(tweet): - og_title_text = fetch_tweet_og_title_text(tweet.tweet_url) - - candidate_text = scraped_text - if og_title_text: - scraped_urls = extract_urls_from_text(scraped_text) - og_urls = extract_urls_from_text(og_title_text) - - if len(og_title_text) >= len(scraped_text) or (og_urls and not scraped_urls): - candidate_text = og_title_text - logging.info("🧾 Using og:title-derived tweet text as primary content") - - candidate_text, resolved_primary_external_url = sanitize_visible_urls_in_text( - candidate_text, http_client, has_media=has_media, - ) - candidate_text = clean_post_text(candidate_text) - - resolved_card_url = resolve_card_url( - getattr(tweet, "card_url", None), http_client - ) - - if resolved_card_url and is_external_non_x_url(resolved_card_url): - if not resolved_primary_external_url: - resolved_primary_external_url = resolved_card_url - logging.info( - f"πŸ”— Using resolved card URL as primary external URL: {resolved_card_url}" - ) - elif resolved_primary_external_url != resolved_card_url: - logging.info( - f"ℹ️ Card URL ({resolved_card_url}) differs from text URL " - f"({resolved_primary_external_url}). Preferring card URL for external embed." - ) - resolved_primary_external_url = resolved_card_url - - if not resolved_primary_external_url: - resolved_primary_external_url = extract_first_resolved_external_url( - candidate_text, - http_client, - allow_playwright_fallback=not has_media, - ) - - return candidate_text, resolved_primary_external_url - - -def remove_url_from_visible_text(text, url_to_remove): - if not text or not url_to_remove: - return text - - canonical_target = canonicalize_url(url_to_remove) - lines = text.splitlines() - cleaned_lines = [] - - for line in lines: - line_urls = extract_urls_from_text(line) - new_line = line - - for url in line_urls: - normalized = normalize_urlish_token(url) or url - cleaned_candidate = canonicalize_url( - strip_trailing_url_punctuation(normalized) - ) - if cleaned_candidate == canonical_target: - pattern = re.escape(url) - new_line = re.sub(pattern, "", new_line) - - new_line = re.sub(r"[ \t]+", " ", new_line).strip() - cleaned_lines.append(new_line) - - result = "\n".join(cleaned_lines) - result = re.sub(r"[ \t]+", " ", result) - result = re.sub(r"\n{3,}", "\n\n", result).strip() - - return result - - -def looks_like_title_plus_url_post(text): - if not text: - return False - - repaired = repair_broken_urls(text) - repaired = strip_line_edge_whitespace(repaired) - lines = [line.strip() for line in repaired.splitlines() if line.strip()] - if len(lines) < 2: - return False - - last_line = lines[-1] - urls_in_last_line = extract_ordered_non_x_urls(last_line) - total_urls = extract_ordered_non_x_urls(repaired) - - return ( - len(urls_in_last_line) == 1 - and len(total_urls) == 1 - and last_line.startswith(("http://", "https://", "www.")) +import requests +from atproto import Client, models + + +# ============================================================ +# Config +# ============================================================ +@dataclass(frozen=True) +class RetryConfig: + login_max_attempts: int = 5 + login_base_delay_seconds: float = 10.0 + login_max_delay_seconds: float = 600.0 + login_jitter_seconds: float = 3.0 + + +# ============================================================ +# Logging +# ============================================================ +def setup_logging() -> None: + logging.basicConfig( + format="%(asctime)s %(levelname)s %(message)s", + level=logging.INFO, + stream=sys.stdout, ) -def looks_like_url_and_tag_tail(text, primary_non_x_url=None): - if not text or not primary_non_x_url: - return False - - repaired = repair_broken_urls(text) - idx = repaired.find(primary_non_x_url) - if idx == -1: - return False - - tail = repaired[idx:].strip() - if not tail.startswith(("http://", "https://", "www.")): - return False - - if re.search(r"(?:https?://|www\.)\S+.*#[^\s#]+", tail): - return True - - return False - - -def find_tail_preservation_start(text, primary_non_x_url): - if not text or not primary_non_x_url: - return None - - url_pos = text.find(primary_non_x_url) - if url_pos == -1: - return None - - hashtag_match = re.search(r"\s#[^\s#]+", text[url_pos:]) - has_hashtag_after_url = hashtag_match is not None - - candidates = [url_pos] - - clause_patterns = [ - r"\.\s+", r":\s+", r";\s+", r"!\s+", r"\?\s+", r",\s+", - ] - - before = text[:url_pos] - for pattern in clause_patterns: - for match in re.finditer(pattern, before): - candidates.append(match.end()) - - last_newline = before.rfind("\n") - if last_newline != -1: - candidates.append(last_newline + 1) - - if has_hashtag_after_url: - generous_start = max(0, url_pos - URL_TAIL_MAX_LOOKBACK_CHARS) - while generous_start > 0 and text[generous_start] not in {" ", "\n"}: - generous_start -= 1 - candidates.append(generous_start) - - reasonable_candidates = [ - c for c in candidates - if 0 <= c < url_pos and (url_pos - c) <= URL_TAIL_MAX_CLAUSE_DISTANCE - ] - - if reasonable_candidates: - start = min(reasonable_candidates, key=lambda c: (url_pos - c)) - if url_pos - start < URL_TAIL_MIN_PREFIX_CHARS: - farther = [ - c for c in reasonable_candidates - if url_pos - c >= URL_TAIL_MIN_PREFIX_CHARS - ] - if farther: - start = min(farther, key=lambda c: (url_pos - c)) - return start - - return url_pos - -def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): - if grapheme_len(text) <= max_length: - return text - - clusters = list(grapheme.graphemes(text)) - truncated = "".join(clusters[:max_length]) - last_space = truncated.rfind(" ") - if last_space > TRUNCATE_MIN_PREFIX_CHARS: - return truncated[:last_space] - return truncated -def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LENGTH): - if ( - not text - or tail_start is None - or tail_start < 0 - or tail_start >= len(text) - ): - return truncate_text_safely(text, max_length) - - if len(text) <= max_length: - return text - - tail = text[tail_start:].strip() - if not tail: - return truncate_text_safely(text, max_length) - - reserve = len(tail) + 1 - if reserve >= max_length: - shortened_tail = tail[-max_length:].strip() - first_space = shortened_tail.find(" ") - if 0 <= first_space <= 30: - shortened_tail = shortened_tail[first_space + 1:].strip() - return shortened_tail - - available_prefix = max_length - reserve - prefix = text[:tail_start].rstrip() - - if len(prefix) > available_prefix: - prefix = prefix[:available_prefix].rstrip() - last_space = prefix.rfind(" ") - if last_space > 20: - prefix = prefix[:last_space].rstrip() - - final_text = f"{prefix} {tail}".strip() - final_text = re.sub(r"[ \t]+", " ", final_text) - final_text = re.sub(r"\n{3,}", "\n\n", final_text).strip() - - if len(final_text) <= max_length: - return final_text - - return truncate_text_safely(text, max_length) - -def choose_final_visible_text( - full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True -): - text = clean_post_text(full_clean_text or "") - if not text: - return text - - if len(text) <= BSKY_TEXT_MAX_LENGTH: - logging.info( - "🟒 Original cleaned tweet text fits in Bluesky. Preserving exact text." - ) - return text - - if primary_non_x_url: - tail_start = find_tail_preservation_start(text, primary_non_x_url) - - if tail_start is not None: - preserved = truncate_text_preserving_tail( - text, tail_start, BSKY_TEXT_MAX_LENGTH - ) - if preserved and len(preserved) <= BSKY_TEXT_MAX_LENGTH: - logging.info( - "πŸ”— Preserving meaningful ending block with URL/hashtags in visible Bluesky text" - ) - return preserved - - if prefer_full_text_without_url and not looks_like_url_and_tag_tail( - text, primary_non_x_url - ): - text_without_url = remove_url_from_visible_text( - text, primary_non_x_url - ).strip() - if text_without_url and len(text_without_url) <= BSKY_TEXT_MAX_LENGTH: - logging.info( - "πŸ”— Keeping full visible text by removing long external URL from body and using external card" - ) - return text_without_url - - truncated = truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH) - logging.info("βœ‚οΈ Falling back to safe truncation for visible Bluesky text") - return truncated - - -def normalize_post_text(text): - if not text: - return "" - - text = clean_post_text(text) - text = text.replace("\r", "\n") - text = re.sub(r"\s+", " ", text).strip() - return text.lower() - - -def build_media_fingerprint(tweet): - if not tweet or not tweet.media: - return "no-media" - - parts = [] - - for media in tweet.media: - media_type = getattr(media, "type", "unknown") - media_url = getattr(media, "media_url_https", "") or "" - stable_value = media_url - - if media_type == "photo": - stable_value = re.sub(r"[?&]name=\w+", "", stable_value) - stable_value = re.sub(r"[?&]format=\w+", "", stable_value) - elif media_type == "video": - stable_value = canonicalize_tweet_url( - tweet.tweet_url or media_url or "" - ) - - parts.append(f"{media_type}:{stable_value}") - - parts.sort() - raw = "|".join(parts) - return hashlib.sha256(raw.encode("utf-8")).hexdigest() - - -def build_bsky_media_fingerprint(post_view): - try: - embed = getattr(post_view, "embed", None) - if not embed: - return "no-media" - - parts = [] - - images = getattr(embed, "images", None) - if images: - for img in images: - image_obj = getattr(img, "image", None) - ref = ( - getattr(image_obj, "ref", None) - or getattr(image_obj, "cid", None) - or str(image_obj) - ) - parts.append(f"photo:{ref}") - - video = getattr(embed, "video", None) - if video: - ref = ( - getattr(video, "ref", None) - or getattr(video, "cid", None) - or str(video) - ) - parts.append(f"video:{ref}") - - external = getattr(embed, "external", None) - if external: - uri = getattr(external, "uri", None) or str(external) - parts.append(f"external:{uri}") - - if not parts: - return "no-media" - - parts.sort() - raw = "|".join(parts) - return hashlib.sha256(raw.encode("utf-8")).hexdigest() - - except Exception as e: - logging.debug(f"Could not build Bluesky media fingerprint: {e}") - return "no-media" - - -def build_text_media_key(normalized_text, media_fingerprint): - return hashlib.sha256( - f"{normalized_text}||{media_fingerprint}".encode("utf-8") - ).hexdigest() - - -# --- Login hardening helpers (NEW) --- -def is_rate_limited_error(error_obj): +# ============================================================ +# Error helpers +# ============================================================ +def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text @@ -1382,7 +67,7 @@ def is_rate_limited_error(error_obj): ) -def is_auth_error(error_obj): +def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text @@ -1393,7 +78,7 @@ def is_auth_error(error_obj): ) -def is_network_error(error_obj): +def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", @@ -1409,358 +94,7 @@ def is_network_error(error_obj): return any(sig in text for sig in signals) -def create_bsky_client(base_url, handle, password): - normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") - logging.info(f"πŸ” Connecting Bluesky client via base URL: {normalized_base_url}") - - client = Client(base_url=normalized_base_url) - - max_attempts = BSKY_LOGIN_MAX_RETRIES - base_delay = BSKY_LOGIN_BASE_DELAY - max_delay = BSKY_LOGIN_MAX_DELAY - jitter_max = max(BSKY_LOGIN_JITTER_MAX, 0.0) - - for attempt in range(1, max_attempts + 1): - try: - logging.info(f"πŸ” Bluesky login attempt {attempt}/{max_attempts} for {handle}") - client.login(handle, password) - logging.info("βœ… Bluesky login successful.") - return client - - except Exception as e: - logging.exception("❌ Bluesky login exception") - - # Fail fast on invalid credentials - if is_auth_error(e): - logging.error("❌ Bluesky auth failed (invalid handle/app password).") - raise - - # Respect explicit rate-limit timing - if is_rate_limited_error(e): - if attempt < max_attempts: - wait = get_rate_limit_wait_seconds(e, default_delay=base_delay) - wait = wait + random.uniform(0, jitter_max) - logging.warning( - f"⏳ Bluesky login rate-limited (attempt {attempt}/{max_attempts}). " - f"Retrying in {wait:.1f}s." - ) - time.sleep(wait) - continue - - logging.error("❌ Exhausted Bluesky login retries due to rate limiting.") - raise - - # Retry transient/network problems - if is_network_error(e) or is_transient_error(e): - if attempt < max_attempts: - wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter_max) - logging.warning( - f"⏳ Transient Bluesky login failure (attempt {attempt}/{max_attempts}). " - f"Retrying in {wait:.1f}s." - ) - time.sleep(wait) - continue - - logging.error("❌ Exhausted Bluesky login retries after transient/network errors.") - raise - - # Unknown errors: bounded retry anyway - if attempt < max_attempts: - wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter_max) - logging.warning( - f"⏳ Bluesky login retry for unexpected error " - f"(attempt {attempt}/{max_attempts}) in {wait:.1f}s." - ) - time.sleep(wait) - continue - - raise - - raise RuntimeError("Bluesky login failed after all retries.") - - -# --- State Management --- -def default_state(): - return { - "version": 1, - "posted_tweets": {}, - "posted_by_bsky_uri": {}, - "updated_at": None, - } - - -def load_state(state_path=STATE_PATH): - if not os.path.exists(state_path): - logging.info( - f"🧠 No state file found at {state_path}. Starting with empty memory." - ) - return default_state() - - try: - with open(state_path, "r", encoding="utf-8") as f: - state = json.load(f) - - if not isinstance(state, dict): - logging.warning("⚠️ State file is invalid. Reinitializing.") - return default_state() - - state.setdefault("version", 1) - state.setdefault("posted_tweets", {}) - state.setdefault("posted_by_bsky_uri", {}) - state.setdefault("updated_at", None) - - return state - - except Exception as e: - logging.warning( - f"⚠️ Could not load state file {state_path}: {e}. Reinitializing." - ) - return default_state() - - -def save_state(state, state_path=STATE_PATH): - try: - state["updated_at"] = arrow.utcnow().isoformat() - temp_path = f"{state_path}.tmp" - - with open(temp_path, "w", encoding="utf-8") as f: - json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) - - os.replace(temp_path, state_path) - logging.info(f"πŸ’Ύ State saved to {state_path}") - - except Exception as e: - logging.error(f"❌ Failed to save state file {state_path}: {e}") - - -def remember_posted_tweet(state, candidate, bsky_uri=None): - canonical_tweet_url = candidate.get("canonical_tweet_url") - fallback_key = f"textmedia:{candidate['text_media_key']}" - state_key = canonical_tweet_url or fallback_key - - record = { - "canonical_tweet_url": canonical_tweet_url, - "normalized_text": candidate["normalized_text"], - "raw_text": candidate["raw_text"], - "full_clean_text": candidate.get("full_clean_text", candidate["raw_text"]), - "media_fingerprint": candidate["media_fingerprint"], - "text_media_key": candidate["text_media_key"], - "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), - "ordered_non_x_urls": candidate.get("ordered_non_x_urls", []), - "resolved_primary_external_url": candidate.get("resolved_primary_external_url"), - "bsky_uri": bsky_uri, - "tweet_created_on": candidate["tweet"].created_on, - "tweet_url": candidate["tweet"].tweet_url, - "posted_at": arrow.utcnow().isoformat(), - } - - state["posted_tweets"][state_key] = record - - if bsky_uri: - state["posted_by_bsky_uri"][bsky_uri] = state_key - - -def candidate_matches_state(candidate, state): - canonical_tweet_url = candidate["canonical_tweet_url"] - text_media_key = candidate["text_media_key"] - normalized_text = candidate["normalized_text"] - - posted_tweets = state.get("posted_tweets", {}) - - if canonical_tweet_url and canonical_tweet_url in posted_tweets: - return True, "state:tweet_url" - - for _, record in posted_tweets.items(): - if record.get("text_media_key") == text_media_key: - return True, "state:text_media_fingerprint" - - for _, record in posted_tweets.items(): - if record.get("normalized_text") == normalized_text: - return True, "state:normalized_text" - - return False, None - - -def prune_state(state, max_entries=5000): - posted_tweets = state.get("posted_tweets", {}) - - if len(posted_tweets) <= max_entries: - return state - - sortable = [] - for key, record in posted_tweets.items(): - posted_at = record.get("posted_at") or "" - sortable.append((key, posted_at)) - - sortable.sort(key=lambda x: x[1], reverse=True) - keep_keys = {key for key, _ in sortable[:max_entries]} - - new_posted_tweets = { - key: record - for key, record in posted_tweets.items() - if key in keep_keys - } - new_posted_by_bsky_uri = { - bsky_uri: key - for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items() - if key in keep_keys - } - - state["posted_tweets"] = new_posted_tweets - state["posted_by_bsky_uri"] = new_posted_by_bsky_uri - return state - - -# --- Bluesky Feed Helpers --- -def extract_urls_from_facets(record): - urls = [] - - try: - facets = getattr(record, "facets", None) or [] - for facet in facets: - features = getattr(facet, "features", None) or [] - for feature in features: - uri = getattr(feature, "uri", None) - if uri: - urls.append(uri) - except Exception as e: - logging.debug(f"Could not extract facet URLs: {e}") - - return urls - - -def get_recent_bsky_posts(client, handle, limit=30): - recent_posts = [] - - try: - timeline = client.get_author_feed(handle, limit=limit) - - for item in timeline.feed: - try: - if item.reason is not None: - continue - - record = item.post.record - if getattr(record, "reply", None) is not None: - continue - - text = getattr(record, "text", "") or "" - normalized_text = normalize_post_text(text) - - urls = [] - urls.extend(extract_non_x_urls_from_text(text)) - urls.extend(extract_urls_from_facets(record)) - - canonical_non_x_urls = set() - for url in urls: - if not is_tco_domain(url) and not is_x_or_twitter_domain(url): - canonical = canonicalize_url( - normalize_urlish_token(url) or url - ) - if canonical: - canonical_non_x_urls.add(canonical) - - media_fingerprint = build_bsky_media_fingerprint(item.post) - text_media_key = build_text_media_key( - normalized_text, media_fingerprint - ) - - recent_posts.append( - { - "uri": getattr(item.post, "uri", None), - "text": text, - "normalized_text": normalized_text, - "canonical_non_x_urls": canonical_non_x_urls, - "media_fingerprint": media_fingerprint, - "text_media_key": text_media_key, - "created_at": getattr(record, "created_at", None), - } - ) - - except Exception as e: - logging.debug( - f"Skipping one Bluesky feed item during dedupe fetch: {e}" - ) - - except Exception as e: - logging.warning( - f"⚠️ Could not fetch recent Bluesky posts for duplicate detection " - f"(live dedup disabled for this cycle): {e}" - ) - - return recent_posts - - -# --- Upload / Retry Helpers --- -def get_rate_limit_wait_seconds(error_obj, default_delay): - """ - Parse common rate-limit headers and return a bounded wait time in seconds. - Supports: - - retry-after - - x-ratelimit-after - - ratelimit-reset (unix timestamp) - """ - try: - now_ts = int(time.time()) - - # Direct headers on exception - headers = getattr(error_obj, "headers", None) or {} - retry_after = headers.get("retry-after") or headers.get("Retry-After") - if retry_after: - return min(max(int(retry_after), 1), BSKY_LOGIN_MAX_DELAY) - - x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") - if x_after: - return min(max(int(x_after), 1), BSKY_LOGIN_MAX_DELAY) - - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") - if reset_value: - wait_seconds = max(int(reset_value) - now_ts + 1, default_delay) - return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) - except Exception: - pass - - try: - # Nested response headers - response = getattr(error_obj, "response", None) - headers = getattr(response, "headers", None) or {} - now_ts = int(time.time()) - - retry_after = headers.get("retry-after") or headers.get("Retry-After") - if retry_after: - return min(max(int(retry_after), 1), BSKY_LOGIN_MAX_DELAY) - - x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") - if x_after: - return min(max(int(x_after), 1), BSKY_LOGIN_MAX_DELAY) - - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") - if reset_value: - wait_seconds = max(int(reset_value) - now_ts + 1, default_delay) - return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) - except Exception: - pass - - # repr fallback parsing - text = repr(error_obj) - m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) - if m: - return min(max(int(m.group(1)), 1), BSKY_LOGIN_MAX_DELAY) - - m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) - if m: - return min(max(int(m.group(1)), 1), BSKY_LOGIN_MAX_DELAY) - - m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) - if m: - now_ts = int(time.time()) - wait_seconds = max(int(m.group(1)) - now_ts + 1, default_delay) - return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) - - return default_delay - - -def is_transient_error(error_obj): +def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", @@ -1776,1904 +110,691 @@ def is_transient_error(error_obj): return any(signal in error_text for signal in transient_signals) -def upload_blob_with_retry(client, binary_data, media_label="media"): - last_exception = None - transient_attempts = 0 +def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: + try: + now_ts = int(time.time()) + headers = getattr(error_obj, "headers", None) or {} - for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): + retry_after = headers.get("retry-after") or headers.get("Retry-After") + if retry_after: + return min(max(float(retry_after), 1.0), max_delay) + + x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") + if x_after: + return min(max(float(x_after), 1.0), max_delay) + + reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") + if reset_value: + wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) + return min(wait_seconds, max_delay) + except Exception: + pass + + text = repr(error_obj) + m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) + if m: + return min(max(float(m.group(1)), 1.0), max_delay) + + m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) + if m: + return min(max(float(m.group(1)), 1.0), max_delay) + + m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) + if m: + now_ts = int(time.time()) + wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) + return min(wait_seconds, max_delay) + + return default_delay + + +# ============================================================ +# Login with backoff +# ============================================================ +def login_with_backoff( + client: Client, + username: str, + password: str, + service_url: str, + max_attempts: int = 5, + base_delay: float = 10.0, + max_delay: float = 600.0, + jitter: float = 1.5, +) -> bool: + for attempt in range(1, max_attempts + 1): try: - result = client.upload_blob(binary_data) - return result.blob + logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {service_url} as {username}") + client.login(username, password) + logging.info("βœ… Login successful.") + return True except Exception as e: - last_exception = e - error_text = str(e) - is_rate_limited = ( - "429" in error_text or "RateLimitExceeded" in error_text - ) + logging.exception("❌ Login exception") - if is_rate_limited: - backoff_delay = min( - BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_BLOB_UPLOAD_MAX_DELAY, - ) - wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + if is_auth_error(e): + logging.error("❌ Bad credentials. Check handle/password.") + return False - if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: - logging.warning( - f"⏳ Bluesky blob upload rate-limited for {media_label}. " - f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." - ) - time.sleep(wait_seconds) + if is_rate_limited_error(e): + if attempt < max_attempts: + wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) + wait += random.uniform(0, jitter) + logging.warning(f"⏳ Rate-limited. Retrying in {wait:.1f}s...") + time.sleep(wait) continue - else: - logging.warning( - f"❌ Exhausted blob upload retries for {media_label} " - f"after rate limiting: {repr(e)}" - ) - break + logging.error("❌ Exhausted login retries due to rate limiting.") + return False - if ( - is_transient_error(e) - and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES - ): - transient_attempts += 1 - wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts - logging.warning( - f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. " - f"Transient retry {transient_attempts}/" - f"{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s." - ) - time.sleep(wait_seconds) + if is_network_error(e) or is_transient_error(e): + if attempt < max_attempts: + wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) + logging.warning(f"⏳ Transient error. Retrying in {wait:.1f}s...") + time.sleep(wait) + continue + logging.error("❌ Exhausted login retries after transient/network errors.") + return False + + if attempt < max_attempts: + wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) + logging.warning(f"⏳ Unknown login error. Retrying in {wait:.1f}s...") + time.sleep(wait) continue - logging.warning(f"Could not upload {media_label}: {repr(e)}") - - if hasattr(e, "response") and e.response is not None: - try: - logging.warning( - f"Upload response status: {e.response.status_code}" - ) - logging.warning(f"Upload response body: {e.response.text}") - except Exception: - pass - - return None - - logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") - return None + return False -def send_post_with_retry(client, **kwargs): - last_exception = None +# ============================================================ +# Utility +# ============================================================ +def detect_mime_type(path: str) -> str: + mime, _ = mimetypes.guess_type(path) + if mime: + return mime + ext = os.path.splitext(path)[1].lower() + fallbacks = { + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".png": "image/png", + ".gif": "image/gif", + ".webp": "image/webp", + ".mp4": "video/mp4", + ".mov": "video/quicktime", + ".webm": "video/webm", + } + return fallbacks.get(ext, "application/octet-stream") - for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): + +def wait_with_heartbeat(total_seconds: float, label: str = "processing") -> None: + if total_seconds <= 0: + return + logging.info(f"⏳ Waiting {total_seconds:.0f}s for {label}...") + remaining = total_seconds + while remaining > 0: + step = min(5.0, remaining) + time.sleep(step) + remaining -= step + if remaining > 0: + logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") + logging.info("βœ… Wait complete.") + + +def pds_did_from_service_url(service_url: str) -> str: + host = (urlparse(service_url).hostname or "").lower() + if not host: + raise ValueError(f"Invalid --service URL: {service_url}") + return f"did:web:{host}" + + +def random_video_name(ext: str = ".mp4") -> str: + token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) + return f"{int(time.time())}_{token}{ext}" + + +def model_to_dict(obj): + if obj is None: + return None + if hasattr(obj, "model_dump"): + return obj.model_dump(by_alias=True, exclude_none=True) + if hasattr(obj, "dict"): + return obj.dict(by_alias=True, exclude_none=True) + return obj + + +def normalize_blob_for_embed(blob_dict: dict): + BlobRef = getattr(models, "BlobRef", None) + if BlobRef is not None: try: - return client.send_post(**kwargs) + return BlobRef.from_dict(blob_dict) + except Exception: + pass + return blob_dict - except Exception as e: - last_exception = e - error_text = str(e) - is_rate_limited = ( - "429" in error_text or "RateLimitExceeded" in error_text - ) - if is_rate_limited: - backoff_delay = min( - BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_SEND_POST_MAX_DELAY, - ) - wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) +# ============================================================ +# ffmpeg compression +# ============================================================ +def ffmpeg_exists() -> bool: + return shutil.which("ffmpeg") is not None - if attempt < BSKY_SEND_POST_MAX_RETRIES: - logging.warning( - f"⏳ Bluesky send_post rate-limited. " - f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s." - ) - time.sleep(wait_seconds) - continue - else: - logging.error( - f"❌ Exhausted send_post retries after rate limiting: {repr(e)}" - ) - raise - if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: - wait_seconds = BSKY_SEND_POST_BASE_DELAY * attempt - logging.warning( - f"⏳ Transient send_post failure: {repr(e)}. " - f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s." - ) - time.sleep(wait_seconds) - continue +def ffprobe_exists() -> bool: + return shutil.which("ffprobe") is not None - raise - raise last_exception - -def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): +def get_video_duration_seconds(path: str) -> float | None: + if not ffprobe_exists(): + return None try: - with Image.open(io.BytesIO(image_bytes)) as img: - img = img.convert("RGB") - - width, height = img.size - max_dim = max(width, height) - - if max_dim > BSKY_IMAGE_MAX_DIMENSION: - scale = BSKY_IMAGE_MAX_DIMENSION / max_dim - new_size = ( - max(1, int(width * scale)), - max(1, int(height * scale)), - ) - img = img.resize(new_size, Image.LANCZOS) - logging.info( - f"πŸ–ΌοΈ Resized post image to {new_size[0]}x{new_size[1]}" - ) - - for quality in [90, 82, 75, 68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: - out = io.BytesIO() - img.save( - out, - format="JPEG", - quality=quality, - optimize=True, - progressive=True, - ) - data = out.getvalue() - logging.info( - f"πŸ–ΌοΈ Post image candidate size at JPEG quality {quality}: " - f"{len(data)} bytes ({len(data) / 1024:.2f} KB)" - ) - if len(data) <= max_bytes: - return data - - for target_dim in [1800, 1600, 1400, 1200, 1000]: - resized = img.copy() - width, height = resized.size - max_dim = max(width, height) - - if max_dim > target_dim: - scale = target_dim / max_dim - new_size = ( - max(1, int(width * scale)), - max(1, int(height * scale)), - ) - resized = resized.resize(new_size, Image.LANCZOS) - - for quality in [68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: - out = io.BytesIO() - resized.save( - out, - format="JPEG", - quality=quality, - optimize=True, - progressive=True, - ) - data = out.getvalue() - logging.info( - f"πŸ–ΌοΈ Post image resized to <= {target_dim}px at quality {quality}: " - f"{len(data)} bytes ({len(data) / 1024:.2f} KB)" - ) - if len(data) <= max_bytes: - return data - - except Exception as e: - logging.warning(f"Could not compress post image: {repr(e)}") - - return None - - -def get_blob_from_url(media_url, client, http_client): - try: - r = http_client.get( - media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True - ) - if r.status_code != 200: - logging.warning( - f"Could not fetch media {media_url}: HTTP {r.status_code}" - ) - return None - - content = r.content - if not content: - logging.warning( - f"Could not fetch media {media_url}: empty response body" - ) - return None - - content_type = (r.headers.get("content-type") or "").lower() - upload_bytes = content - - if content_type.startswith("image/"): - original_size = len(content) - logging.info( - f"πŸ–ΌοΈ Downloaded post image {media_url} " - f"({original_size} bytes / {original_size / 1024:.2f} KB)" - ) - - if original_size > BSKY_IMAGE_MAX_BYTES: - logging.info( - f"πŸ–ΌοΈ Post image exceeds safe Bluesky limit " - f"({original_size} bytes > {BSKY_IMAGE_MAX_BYTES} bytes). Compressing..." - ) - compressed = compress_post_image_to_limit( - content, BSKY_IMAGE_MAX_BYTES - ) - if compressed: - upload_bytes = compressed - logging.info( - f"βœ… Post image compressed to {len(upload_bytes)} bytes " - f"({len(upload_bytes) / 1024:.2f} KB)" - ) - else: - logging.warning( - f"⚠️ Could not compress post image to safe limit: {media_url}" - ) - return None - - return upload_blob_with_retry(client, upload_bytes, media_label=media_url) - - except Exception as e: - logging.warning(f"Could not fetch media {media_url}: {repr(e)}") + cmd = [ + "ffprobe", + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + path, + ] + out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, text=True).strip() + return float(out) + except Exception: return None -def get_blob_from_file(file_path, client): +def compress_video_ffmpeg( + input_path: str, + max_size_mb: float = 45.0, + crf: int = 28, + preset: str = "veryfast", + audio_bitrate_k: int = 96, +) -> str | None: + if not ffmpeg_exists(): + logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or run with --no-compress-video.") + return None + + if not os.path.exists(input_path): + logging.error(f"❌ Input video not found: {input_path}") + return None + + src_size_mb = os.path.getsize(input_path) / (1024 * 1024) + logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") + + if src_size_mb <= max_size_mb: + logging.info("βœ… Source video already under target size. Skipping compression.") + return input_path + + duration = get_video_duration_seconds(input_path) + target_video_k = 1200 + + if duration and duration > 0: + total_kbps = (max_size_mb * 8192.0) / duration + target_video_k = int(max(300, total_kbps - audio_bitrate_k)) + target_video_k = min(max(target_video_k, 300), 5000) + + fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") + os.close(fd) + + cmd = [ + "ffmpeg", + "-y", + "-i", input_path, + "-c:v", "libx264", + "-preset", preset, + "-crf", str(crf), + "-b:v", f"{target_video_k}k", + "-maxrate", f"{int(target_video_k * 1.3)}k", + "-bufsize", f"{int(target_video_k * 2)}k", + "-vf", "scale='min(1280,iw)':-2", + "-c:a", "aac", + "-b:a", f"{audio_bitrate_k}k", + "-movflags", "+faststart", + out_path, + ] + try: - if not os.path.exists(file_path): - logging.warning( - f"Could not upload local file {file_path}: file does not exist" - ) - return None - - file_size = os.path.getsize(file_path) - file_size_mb = file_size / (1024 * 1024) - logging.info( - f"πŸ“¦ Uploading local file {file_path} ({file_size_mb:.2f} MB)" + f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset}, v_bitrateβ‰ˆ{target_video_k}k)..." ) + subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if ( - file_path.lower().endswith(".mp4") - and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB - ): - logging.warning( - f"Could not upload local file {file_path}: " - f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" - ) - return None + out_size_mb = os.path.getsize(out_path) / (1024 * 1024) + logging.info(f"βœ… Compressed video size: {out_size_mb:.2f} MB") - with open(file_path, "rb") as f: - binary_data = f.read() + if out_size_mb < src_size_mb: + return out_path - return upload_blob_with_retry(client, binary_data, media_label=file_path) - - except Exception as e: - logging.warning(f"Could not upload local file {file_path}: {repr(e)}") - - if hasattr(e, "response") and e.response is not None: - try: - logging.warning( - f"Upload response status: {e.response.status_code}" - ) - logging.warning(f"Upload response body: {e.response.text}") - except Exception: - pass + logging.info("ℹ️ Compression not smaller than source. Using original.") + try: + os.remove(out_path) + except Exception: + pass + return input_path + except subprocess.CalledProcessError as e: + logging.error("❌ ffmpeg compression failed.") + if e.stderr: + logging.error(e.stderr[-2000:]) + try: + os.remove(out_path) + except Exception: + pass return None -def compress_external_thumb_to_limit( - image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES -): +# ============================================================ +# Media upload β€” Image +# ============================================================ +def upload_image( + client: Client, + image_path: str, + alt_text: str = "", +) -> models.AppBskyEmbedImages.Image | None: try: - with Image.open(io.BytesIO(image_bytes)) as img: - img = img.convert("RGB") + if not os.path.exists(image_path): + logging.error(f"❌ Image file not found: {image_path}") + return None - width, height = img.size - max_dim = max(width, height) + mime = detect_mime_type(image_path) + with open(image_path, "rb") as f: + data = f.read() - if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: - scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim - new_size = ( - max(1, int(width * scale)), - max(1, int(height * scale)), - ) - img = img.resize(new_size, Image.LANCZOS) - logging.info( - f"πŸ–ΌοΈ Resized external thumb to {new_size[0]}x{new_size[1]}" - ) + logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") + response = client.upload_blob(data) + logging.info("βœ… Image uploaded successfully.") - for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: - out = io.BytesIO() - img.save( - out, - format="JPEG", - quality=quality, - optimize=True, - progressive=True, - ) - data = out.getvalue() - logging.info( - f"πŸ–ΌοΈ External thumb candidate size at JPEG quality {quality}: " - f"{len(data) / 1024:.2f} KB" - ) - if len(data) <= max_bytes: - return data - - for target_dim in [1000, 900, 800, 700, 600]: - resized = img.copy() - width, height = resized.size - max_dim = max(width, height) - - if max_dim > target_dim: - scale = target_dim / max_dim - new_size = ( - max(1, int(width * scale)), - max(1, int(height * scale)), - ) - resized = resized.resize(new_size, Image.LANCZOS) - - for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: - out = io.BytesIO() - resized.save( - out, - format="JPEG", - quality=quality, - optimize=True, - progressive=True, - ) - data = out.getvalue() - logging.info( - f"πŸ–ΌοΈ External thumb resized to <= {target_dim}px at quality {quality}: " - f"{len(data) / 1024:.2f} KB" - ) - if len(data) <= max_bytes: - return data + return models.AppBskyEmbedImages.Image(image=response.blob, alt=alt_text) except Exception as e: - logging.warning(f"Could not compress external thumbnail: {repr(e)}") + logging.error(f"❌ Failed to upload image: {repr(e)}") + return None + +# ============================================================ +# Media upload β€” Video via PDS direct fallback +# ============================================================ +def upload_video_via_pds( + client: Client, + video_path: str, + alt_text: str = "", + settle_delay_seconds: float = 30.0, +) -> models.AppBskyEmbedVideo.Main | None: + try: + if not os.path.exists(video_path): + logging.error(f"❌ Video file not found: {video_path}") + return None + + with open(video_path, "rb") as f: + video_bytes = f.read() + + size_mb = len(video_bytes) / (1024 * 1024) + logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") + + response = client.upload_blob(video_bytes) + blob = response.blob + logging.warning("⚠️ [PDS-direct fallback] Blob uploaded. Waiting for indexing...") + + wait_with_heartbeat(settle_delay_seconds, label="PDS/AppView indexing") + + return models.AppBskyEmbedVideo.Main(video=blob, alt=alt_text) + + except Exception as e: + logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") + return None + + +# ============================================================ +# Media upload β€” Video via video.bsky.app (primary) +# ============================================================ +def _extract_service_auth_token(upload_auth) -> str | None: + token = getattr(upload_auth, "token", None) + if token: + return token + if isinstance(upload_auth, dict): + return upload_auth.get("token") return None -def get_external_thumb_blob_from_url(image_url, client, http_client): - try: - r = http_client.get( - image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True - ) - if r.status_code != 200: - logging.warning( - f"Could not fetch external thumb {image_url}: HTTP {r.status_code}" - ) +def _poll_video_job(video_host: str, job_id: str, alt_text: str) -> models.AppBskyEmbedVideo.Main | None: + status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" + deadline = time.time() + 600 + + while time.time() < deadline: + status_resp = requests.get(status_url, params={"jobId": job_id}, timeout=30) + if status_resp.status_code != 200: + logging.error(f"❌ Job status check failed: {status_resp.status_code} - {status_resp.text}") return None - content = r.content - if not content: - logging.warning( - f"Could not fetch external thumb {image_url}: empty body" - ) - return None + status_json = status_resp.json() + job_status = status_json.get("jobStatus", {}) + state = job_status.get("state") - original_size_kb = len(content) / 1024 - logging.info( - f"πŸ–ΌοΈ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)" - ) - - upload_bytes = content - if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES: - logging.info( - f"πŸ–ΌοΈ External thumb exceeds safe limit " - f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..." - ) - compressed = compress_external_thumb_to_limit( - upload_bytes, EXTERNAL_THUMB_MAX_BYTES - ) - if compressed: - upload_bytes = compressed - logging.info( - f"βœ… External thumb compressed to {len(upload_bytes) / 1024:.2f} KB" - ) - else: - logging.warning( - "⚠️ Could not compress external thumb to fit limit. Will omit thumbnail." - ) + if state == "JOB_STATE_COMPLETED": + blob_dict = job_status.get("blob") + if not blob_dict: + logging.error(f"❌ No blob in completed job status: {status_json}") return None - else: - logging.info("βœ… External thumb already within safe size limit.") - blob = upload_blob_with_retry( - client, - upload_bytes, - media_label=f"external-thumb:{image_url}", - ) - if blob: - return blob + wait_with_heartbeat(8, label="CDN propagation") + blob_obj = normalize_blob_for_embed(blob_dict) + logging.info("βœ… Video processed successfully.") + return models.AppBskyEmbedVideo.Main(video=blob_obj, alt=alt_text) - logging.warning("⚠️ External thumb upload failed. Will omit thumbnail.") - return None + if state == "JOB_STATE_FAILED": + logging.error(f"❌ Video processing failed: {job_status}") + return None - except Exception as e: - logging.warning( - f"Could not fetch/upload external thumb {image_url}: {repr(e)}" - ) - return None + logging.info(f" ...still processing (state={state})...") + time.sleep(3) + + logging.error("❌ Video processing timed out.") + return None -def fetch_link_metadata(url, http_client): +def upload_video_via_bsky_service( + client: Client, + video_path: str, + service_url: str, + alt_text: str = "", +) -> models.AppBskyEmbedVideo.Main | None: try: - r = http_client.get( - url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True - ) - r.raise_for_status() - soup = BeautifulSoup(r.text, "html.parser") + if not os.path.exists(video_path): + logging.error(f"❌ Video file not found: {video_path}") + return None - title = soup.find("meta", property="og:title") or soup.find("title") - desc = ( - soup.find("meta", property="og:description") - or soup.find("meta", attrs={"name": "description"}) - ) - image = ( - soup.find("meta", property="og:image") - or soup.find("meta", attrs={"name": "twitter:image"}) - ) + with open(video_path, "rb") as f: + video_bytes = f.read() - return { - "title": ( - title["content"] - if title and title.has_attr("content") - else (title.text.strip() if title and title.text else "") - ), - "description": ( - desc["content"] if desc and desc.has_attr("content") else "" - ), - "image": ( - image["content"] if image and image.has_attr("content") else None - ), + size_mb = len(video_bytes) / (1024 * 1024) + logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") + + VIDEO_HOST = "https://video.bsky.app" + pds_did = pds_did_from_service_url(service_url) + + try: + params = models.ComAtprotoServerGetServiceAuth.Params( + aud=pds_did, + lxm="com.atproto.repo.uploadBlob", + exp=int(time.time()) + 60 * 30, + ) + upload_auth = client.com.atproto.server.get_service_auth(params) + except Exception: + upload_auth = client.com.atproto.server.get_service_auth( + { + "aud": pds_did, + "lxm": "com.atproto.repo.uploadBlob", + "exp": int(time.time()) + 60 * 30, + } + ) + + token = _extract_service_auth_token(upload_auth) + if not token: + logging.error("❌ Failed to extract service auth token.") + return None + + user_did = client.me.did + upload_name = random_video_name(".mp4") + logging.info(f"🎞️ Upload name: {upload_name}") + + upload_url = ( + f"{VIDEO_HOST}/xrpc/app.bsky.video.uploadVideo" + f"?did={user_did}&name={upload_name}" + ) + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "video/mp4", } + upload_resp = requests.post(upload_url, headers=headers, data=video_bytes, timeout=180) + + if upload_resp.status_code not in (200, 409): + logging.error(f"❌ video.bsky.app upload failed: {upload_resp.status_code} - {upload_resp.text}") + return None + + body = upload_resp.json() + + if upload_resp.status_code == 409: + if body.get("error") == "already_exists" and body.get("jobId"): + logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") + else: + logging.error(f"❌ video.bsky.app returned 409 without reusable jobId: {body}") + return None + + job_id = body.get("jobId") + if not job_id: + logging.error(f"❌ No jobId returned from video service. Response: {body}") + return None + + logging.info(f"⏳ Job {job_id} accepted β€” polling status...") + return _poll_video_job(VIDEO_HOST, job_id, alt_text=alt_text) + except Exception as e: - logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}") - return {} + logging.error(f"❌ video.bsky.app upload failed: {repr(e)}") + return None -def build_external_link_embed( - url, client, http_client, fallback_title="Link", prefetched_metadata=None, -): - link_metadata = ( - prefetched_metadata - if prefetched_metadata is not None - else fetch_link_metadata(url, http_client) +# ============================================================ +# Video dispatcher +# ============================================================ +def upload_video_smart( + client: Client, + video_path: str, + service_url: str, + alt_text: str = "", + settle_delay_seconds: float = 30.0, + allow_pds_video_fallback: bool = False, +) -> models.AppBskyEmbedVideo.Main | None: + logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") + embed = upload_video_via_bsky_service( + client=client, + video_path=video_path, + service_url=service_url, + alt_text=alt_text, ) + if embed: + return embed - thumb_blob = None - if link_metadata.get("image"): - thumb_blob = get_external_thumb_blob_from_url( - link_metadata["image"], client, http_client - ) - if thumb_blob: - logging.info("βœ… External link card thumbnail prepared successfully") - else: - logging.info("ℹ️ External link card will be posted without thumbnail") - - if ( - link_metadata.get("title") - or link_metadata.get("description") - or thumb_blob - ): - return models.AppBskyEmbedExternal.Main( - external=models.AppBskyEmbedExternal.External( - uri=url, - title=link_metadata.get("title") or fallback_title, - description=link_metadata.get("description") or "", - thumb=thumb_blob, - ) + if allow_pds_video_fallback: + logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") + return upload_video_via_pds( + client=client, + video_path=video_path, + alt_text=alt_text, + settle_delay_seconds=settle_delay_seconds, ) + logging.error("❌ video.bsky.app failed. Not using direct fallback unless enabled.") return None -def make_rich(content): - # NOTE: Bluesky supports native @mention facets, but resolving a Twitter - # handle to a Bluesky DID requires an external lookup. That mapping is not - # available here so @mentions are passed through as plain text. If you add - # a handle-mapping table in the future, call - # text_builder.mention(word, did) here instead of text_builder.text(word). - text_builder = client_utils.TextBuilder() - content = clean_post_text(content) - lines = content.splitlines() - - for line_idx, line in enumerate(lines): - if not line.strip(): - if line_idx < len(lines) - 1: - text_builder.text("\n") - continue - - words = line.split(" ") - for i, word in enumerate(words): - if not word: - if i < len(words) - 1: - text_builder.text(" ") - continue - - cleaned_word = strip_trailing_url_punctuation(word) - normalized_candidate = normalize_urlish_token(cleaned_word) - - if normalized_candidate: - if is_x_or_twitter_domain(normalized_candidate): - text_builder.text(word) - else: - clean_url_value = clean_url(normalized_candidate) - - if clean_url_value and is_valid_url(clean_url_value): - text_builder.link(cleaned_word, clean_url_value) - trailing = word[len(cleaned_word):] - if trailing: - text_builder.text(trailing) - else: - text_builder.text(word) - - elif cleaned_word.startswith("#") and len(cleaned_word) > 1: - clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"") - if clean_tag: - text_builder.tag(cleaned_word, clean_tag) - trailing = word[len(cleaned_word):] - if trailing: - text_builder.text(trailing) - else: - text_builder.text(word) - - else: - text_builder.text(word) - - if i < len(words) - 1: - text_builder.text(" ") - - if line_idx < len(lines) - 1: - text_builder.text("\n") - - return text_builder - - -def build_dynamic_alt(raw_text, link_title=None): - dynamic_alt = clean_post_text(raw_text) - dynamic_alt = dynamic_alt.replace("\n", " ").strip() - dynamic_alt = re.sub( - r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt - ).strip() - - if not dynamic_alt and link_title: - dynamic_alt = link_title.strip() - - if len(dynamic_alt) > DYNAMIC_ALT_MAX_LENGTH: - dynamic_alt = dynamic_alt[:DYNAMIC_ALT_MAX_LENGTH] - elif not dynamic_alt: - dynamic_alt = "Attached video or image from tweet" - - return dynamic_alt - - -def build_video_embed(video_blob, alt_text): - try: - return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) - except AttributeError: - logging.error( - "❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto." - ) - return None - - -# --- Twitter Scraping --- -def scrape_tweets_via_playwright(username, password, email, target_handle, locale="en-US"): - tweets = [] - state_file = "twitter_browser_state.json" - - if os.path.exists(state_file): - try: - os.chmod(state_file, SESSION_FILE_PERMISSIONS) - except Exception as e: - logging.warning( - f"⚠️ Could not set permissions on {state_file}: {e}" - ) - - with sync_playwright() as p: - browser = p.chromium.launch( - headless=True, - args=["--disable-blink-features=AutomationControlled"], - ) - clean_ua = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" - ) - - browser_context = None - needs_login = True - session_check_page = None - - if os.path.exists(state_file): - logging.info( - "βœ… Found existing browser state. Attempting to bypass login..." - ) - browser_context = browser.new_context( - user_agent=clean_ua, - viewport={"width": 1920, "height": 1080}, - storage_state=state_file, - locale=locale, - ) - session_check_page = browser_context.new_page() - session_check_page.goto("https://x.com/home") - time.sleep(3) - - if ( - session_check_page.locator( - '[data-testid="SideNav_NewTweet_Button"]' - ).is_visible() - or "/home" in session_check_page.url - ): - logging.info("βœ… Session is valid!") - needs_login = False - else: - logging.warning( - "⚠️ Saved session expired or invalid. Re-logging in..." - ) - session_check_page.close() - session_check_page = None - browser_context.close() - browser_context = None - os.remove(state_file) - - if session_check_page is not None: - session_check_page.close() - session_check_page = None - - if needs_login: - logging.info( - "πŸš€ Launching fresh browser for automated Twitter login..." - ) - browser_context = browser.new_context( - user_agent=clean_ua, - viewport={"width": 1920, "height": 1080}, - locale=locale, # βœ… add this - ) - login_page = browser_context.new_page() - - try: - login_page.goto("https://x.com") - sign_in_button = login_page.get_by_text("Sign in", exact=True) - sign_in_button.wait_for(state="visible", timeout=15000) - sign_in_button.click(force=True) - - login_page.wait_for_selector( - 'h1:has-text("Sign in to X")', - state="visible", - timeout=25000, - ) - logging.info(f"πŸ‘€ Entering username: {username}...") - time.sleep(1) - - username_input = login_page.locator( - 'input[autocomplete="username"]' - ).first - username_input.wait_for(state="visible", timeout=15000) - username_input.click(force=True) - username_input.press_sequentially(username, delay=100) - - login_page.locator('button:has-text("Next")').first.click( - force=True - ) - login_page.wait_for_selector( - 'input[name="password"], ' - 'input[data-testid="ocfEnterTextTextInput"], ' - 'input[name="text"]', - timeout=15000, - ) - time.sleep(1) - - if login_page.locator( - 'input[data-testid="ocfEnterTextTextInput"]' - ).is_visible() or login_page.locator( - 'input[name="text"]' - ).is_visible(): - logging.warning( - "πŸ›‘οΈ Security challenge detected! Entering email/phone..." - ) - login_page.fill( - 'input[data-testid="ocfEnterTextTextInput"], ' - 'input[name="text"]', - email, - ) - sec_next = login_page.locator( - '[data-testid="ocfEnterTextNextButton"], ' - 'span:has-text("Next")' - ).first - if sec_next.is_visible(): - sec_next.click(force=True) - else: - login_page.keyboard.press("Enter") - login_page.wait_for_selector( - 'input[name="password"]', timeout=15000 - ) - time.sleep(1) - - logging.info("πŸ”‘ Entering password...") - login_page.fill('input[name="password"]', password) - login_page.locator('span:has-text("Log in")').first.click() - - login_page.wait_for_url("**/home", timeout=40000) - time.sleep(3) - - browser_context.storage_state(path=state_file) - try: - os.chmod(state_file, SESSION_FILE_PERMISSIONS) - except Exception as chmod_err: - logging.warning( - f"⚠️ Could not set permissions on {state_file} " - f"after save: {chmod_err}" - ) - logging.info("βœ… Login successful. Browser state saved.") - - except Exception as e: - take_error_screenshot(login_page, "login_failed") - logging.error(f"❌ Login failed: {e}") - login_page.close() - browser.close() - return [] - - login_page.close() - - logging.info( - f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets..." - ) - scrape_page = browser_context.new_page() - scrape_page.goto(f"https://x.com/{target_handle}") - - try: - scrape_page.wait_for_selector("article", timeout=40000) - time.sleep(2) - - articles = scrape_page.locator("article").all() - logging.info( - f"πŸ“Š Found {len(articles)} tweets on screen. " - f"Parsing up to {SCRAPE_TWEET_LIMIT}..." - ) - - for article in articles[:SCRAPE_TWEET_LIMIT]: - try: - time_el = article.locator("time").first - if not time_el.is_visible(): - continue - - created_at = time_el.get_attribute("datetime") - - tweet_url = None - time_link = article.locator("a:has(time)").first - if time_link.is_visible(): - href = time_link.get_attribute("href") - if href: - tweet_url = ( - f"https://x.com{href}" - if href.startswith("/") - else href - ) - - is_retweet = False - try: - social_context_el = article.locator( - '[data-testid="socialContext"]' - ).first - if social_context_el.is_visible(): - context_text = social_context_el.inner_text().lower() - repost_keywords = [ - "reposted", - "retweeted", - "ha repostejat", - "ha retuitat", - "repostejat", - "ha reposteado", - "retuiteΓ³", - ] - if any(kw in context_text for kw in repost_keywords): - is_retweet = True - logging.info( - f"πŸ” Detected retweet/repost: {tweet_url}" - ) - except Exception: - pass - - text_locator = article.locator( - '[data-testid="tweetText"]' - ).first - text = ( - text_locator.inner_text() - if text_locator.is_visible() - else "" - ) - - media_urls = [] - - photo_locators = article.locator( - '[data-testid="tweetPhoto"] img' - ).all() - for img in photo_locators: - src = img.get_attribute("src") - if src: - src = re.sub(r"&name=\w+", "&name=large", src) - media_urls.append((src, "photo")) - - video_locators = article.locator( - '[data-testid="videoPlayer"]' - ).all() - if video_locators: - media_urls.append((tweet_url or "", "video")) - - card_url = None - try: - card_locator = article.locator( - '[data-testid="card.wrapper"] a[href]' - ).first - if card_locator.is_visible(): - card_href = card_locator.get_attribute("href") - if card_href: - card_url = card_href.strip() - logging.info( - f"πŸƒ Scraped card URL from tweet: {card_url}" - ) - except Exception: - pass - - if not card_url: - try: - card_role_link = article.locator( - '[data-testid="card.wrapper"] [role="link"]' - ).first - if card_role_link.is_visible(): - card_a = card_role_link.locator("a[href]").first - if card_a.is_visible(): - card_href = card_a.get_attribute("href") - if card_href: - card_url = card_href.strip() - logging.info( - f"πŸƒ Scraped card URL (fallback) from tweet: {card_url}" - ) - except Exception: - pass - - tweets.append( - ScrapedTweet( - created_at, - text, - media_urls, - tweet_url=tweet_url, - card_url=card_url, - is_retweet=is_retweet, - ) - ) - - except Exception as e: - logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") - continue - - except Exception as e: - take_error_screenshot(scrape_page, "scrape_failed") - logging.error(f"❌ Failed to scrape profile: {e}") - - browser.close() - return tweets - - -# --- Video Extraction & Processing --- -def extract_video_url_from_tweet_page(browser_context, tweet_url): - page = browser_context.new_page() - best_m3u8_url = None - best_video_mp4_url = None - seen_urls = set() # ← scoped per call, so already reset per tweet βœ… - - def is_audio_only_mp4(url, content_type): - url_l = url.lower() - content_type_l = content_type.lower() - return ( - "/aud/" in url_l - or "/audio/" in url_l - or "mp4a" in url_l - or ("audio/" in content_type_l and "video/" not in content_type_l) - ) - - def handle_response(response): - nonlocal best_m3u8_url, best_video_mp4_url - try: - url = response.url - if url in seen_urls: - return - seen_urls.add(url) - - url_l = url.lower() - content_type = response.headers.get("content-type", "") - content_type_l = content_type.lower() - - if ".m4s" in url_l: - return - - if ( - ".m3u8" in url_l - or "application/vnd.apple.mpegurl" in content_type_l - or "application/x-mpegurl" in content_type_l - ): - if best_m3u8_url is None: - best_m3u8_url = url - logging.info(f"πŸ“Ί Found HLS playlist URL: {url}") - return - - if ( - ".mp4" in url_l - or "video/mp4" in content_type_l - or "audio/mp4" in content_type_l - ): - if is_audio_only_mp4(url, content_type): - logging.info(f"πŸ”‡ Ignoring audio-only MP4: {url}") - return - - if best_video_mp4_url is None: - best_video_mp4_url = url - logging.info(f"πŸŽ₯ Found VIDEO MP4 URL: {url}") - return - - except Exception as e: - logging.debug(f"Response parsing error: {e}") - - page.on("response", handle_response) - - def current_best(): - return best_m3u8_url or best_video_mp4_url +# ============================================================ +# Post creation +# ============================================================ +def post_to_bsky( + client: Client, + text: str, + langs: list[str], + image_path: str | None = None, + video_path: str | None = None, + alt_text: str = "", + service_url: str = "https://bsky.social", + video_settle_delay: float = 30.0, + allow_pds_video_fallback: bool = False, +) -> bool: + post_text = text.strip() + + if not post_text and not image_path and not video_path: + logging.error("❌ Empty post text with no media is not allowed.") + return False try: - logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") - page.goto(tweet_url, wait_until="domcontentloaded", timeout=40000) - time.sleep(2) + embed_obj = None - player = page.locator('[data-testid="videoPlayer"]').first + if video_path: + logging.info(f"🎬 Preparing video upload: {video_path}") + embed_obj = upload_video_smart( + client=client, + video_path=video_path, + service_url=service_url, + alt_text=alt_text, + settle_delay_seconds=video_settle_delay, + allow_pds_video_fallback=allow_pds_video_fallback, + ) + if not embed_obj: + logging.error("❌ Aborting post: video upload/processing failed.") + return False - if player.count() > 0: - try: - player.scroll_into_view_if_needed(timeout=5000) - except Exception: - pass + elif image_path: + image = upload_image(client, image_path, alt_text=alt_text) + if not image: + logging.error("❌ Aborting post: image upload failed.") + return False + embed_obj = models.AppBskyEmbedImages.Main(images=[image]) - try: - player.click(force=True, timeout=5000) - logging.info("▢️ Clicked video player") - except Exception as e: - logging.info(f"⚠️ First player click failed: {e}") - else: - logging.warning("⚠️ No video player locator found on tweet page") + record = { + "$type": "app.bsky.feed.post", + "text": post_text, + "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), + } - for _ in range(VIDEO_PLAYER_WAIT_ROUNDS): - if current_best(): - break - time.sleep(1) + if langs: + record["langs"] = langs - if not current_best() and player.count() > 0: - logging.info("πŸ” No media URL found yet, retrying player interaction...") - try: - player.click(force=True, timeout=5000) - time.sleep(PLAYWRIGHT_RETRY_SLEEP_S) - except Exception as e: - logging.info(f"⚠️ Retry click failed: {e}") + if embed_obj is not None: + record["embed"] = model_to_dict(embed_obj) - try: - page.keyboard.press("Space") - time.sleep(1) - except Exception: - pass + logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") - for _ in range(VIDEO_PLAYER_RETRY_ROUNDS): - if current_best(): - break - time.sleep(1) - - selected_url = current_best() - if selected_url: - logging.info(f"βœ… Selected media URL for download: {selected_url}") - else: - logging.warning( - f"⚠️ No playable media URL detected on tweet page: {tweet_url}" + try: + resp = client.com.atproto.repo.create_record( + models.ComAtprotoRepoCreateRecord.Data( + repo=client.me.did, + collection="app.bsky.feed.post", + record=record, + ) + ) + except Exception: + resp = client.com.atproto.repo.create_record( + { + "repo": client.me.did, + "collection": "app.bsky.feed.post", + "record": record, + } ) - return selected_url + uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) + logging.info(f"βœ… Post published! URI: {uri}") + return True except Exception as e: - logging.warning( - f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}" - ) - return None - finally: - page.remove_listener("response", handle_response) # ← FIX 1: detach before close - page.close() + logging.error(f"❌ Failed to send post: {repr(e)}") + return False -def _probe_video_duration(file_path): - probe_cmd = [ - "ffprobe", - "-v", "error", - "-show_entries", "format=duration", - "-of", "default=noprint_wrappers=1:nokey=1", - file_path, - ] - try: - result = subprocess.run( - probe_cmd, - capture_output=True, - text=True, - timeout=FFPROBE_TIMEOUT_SECONDS, - ) - if result.returncode != 0: - raise RuntimeError( - f"ffprobe exited with code {result.returncode}: " - f"{result.stderr.strip()}" - ) - duration_str = result.stdout.strip() - if not duration_str: - raise RuntimeError("ffprobe returned empty duration output") - return float(duration_str) - except subprocess.TimeoutExpired: - raise RuntimeError( - f"ffprobe timed out after {FFPROBE_TIMEOUT_SECONDS}s on {file_path}" - ) - - -def download_and_crop_video(video_url, output_path): - temp_input = output_path.replace(".mp4", "_source.mp4") - temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") - temp_output = output_path.replace(".mp4", "_compressed.mp4") - - try: - logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}") - - video_url_l = video_url.lower() - - if ".m3u8" in video_url_l: - logging.info("πŸ“Ί Using HLS ffmpeg mode") - download_cmd = [ - "ffmpeg", "-y", - "-protocol_whitelist", "file,http,https,tcp,tls,crypto", - "-allowed_extensions", "ALL", - "-i", video_url, - "-c", "copy", - temp_input, - ] - else: - logging.info("πŸŽ₯ Using direct MP4 ffmpeg mode") - download_cmd = [ - "ffmpeg", "-y", - "-i", video_url, - "-c", "copy", - temp_input, - ] - - download_result = subprocess.run( - download_cmd, - capture_output=True, - text=True, - timeout=SUBPROCESS_TIMEOUT_SECONDS, - ) - - if download_result.returncode != 0: - logging.error( - f"❌ ffmpeg download failed:\n{download_result.stderr}" - ) - return None - - if ( - not os.path.exists(temp_input) - or os.path.getsize(temp_input) == 0 - ): - logging.error("❌ Downloaded video source file is missing or empty.") - return None - - logging.info(f"βœ… Video downloaded: {temp_input}") - - try: - duration = _probe_video_duration(temp_input) - except RuntimeError as probe_err: - logging.error(f"❌ Could not probe video duration: {probe_err}") - return None - - if duration <= 0: - logging.error("❌ Downloaded video has invalid or unknown duration.") - return None - - end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) - - video_clip = VideoFileClip(temp_input) - try: - if hasattr(video_clip, "subclipped"): - cropped_clip = video_clip.subclipped(0, end_time) - else: - cropped_clip = video_clip.subclip(0, end_time) - - try: - cropped_clip.write_videofile( - temp_trimmed, - codec="libx264", - audio_codec="aac", - preset="veryfast", - bitrate="1800k", - audio_bitrate="128k", - logger=None, - ) - finally: - cropped_clip.close() - finally: - video_clip.close() - - if ( - not os.path.exists(temp_trimmed) - or os.path.getsize(temp_trimmed) == 0 - ): - logging.error("❌ Trimmed video output is missing or empty.") - return None - - trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024) - logging.info( - f"πŸ“¦ Trimmed video size before compression: {trimmed_size_mb:.2f} MB" - ) - - compress_cmd = [ - "ffmpeg", "-y", - "-i", temp_trimmed, - "-vf", "scale='min(720,iw)':-2", - "-c:v", "libx264", - "-preset", "veryfast", - "-crf", "30", - "-maxrate", "1800k", - "-bufsize", "3600k", - "-c:a", "aac", - "-b:a", "128k", - "-movflags", "+faststart", - temp_output, - ] - - compress_result = subprocess.run( - compress_cmd, - capture_output=True, - text=True, - timeout=SUBPROCESS_TIMEOUT_SECONDS, - ) - - if compress_result.returncode != 0: - logging.error( - f"❌ ffmpeg compression failed:\n{compress_result.stderr}" - ) - return None - - if ( - not os.path.exists(temp_output) - or os.path.getsize(temp_output) == 0 - ): - logging.error("❌ Compressed video output is missing or empty.") - return None - - final_size_mb = os.path.getsize(temp_output) / (1024 * 1024) - logging.info( - f"βœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)" - ) - - os.replace(temp_output, output_path) - logging.info(f"βœ… Final video ready: {output_path}") - return output_path - - except subprocess.TimeoutExpired: - logging.error( - f"❌ ffmpeg subprocess timed out after {SUBPROCESS_TIMEOUT_SECONDS}s" - ) - return None - - except Exception as e: - logging.error(f"❌ Error processing video: {repr(e)}") - return None - - finally: - remove_file_quietly(temp_input) - remove_file_quietly(temp_trimmed) - remove_file_quietly(temp_output) - - -# --- Deduplication --- -def candidate_matches_existing_bsky(candidate, recent_bsky_posts): - candidate_non_x_urls = candidate["canonical_non_x_urls"] - candidate_text_media_key = candidate["text_media_key"] - candidate_normalized_text = candidate["normalized_text"] - - for existing in recent_bsky_posts: - existing_non_x_urls = existing["canonical_non_x_urls"] - - if ( - candidate_non_x_urls - and candidate_non_x_urls == existing_non_x_urls - and candidate_normalized_text == existing["normalized_text"] - ): - return True, "bsky:normalized_text_plus_non_x_urls" - - if candidate_text_media_key == existing["text_media_key"]: - return True, "bsky:text_media_fingerprint" - - if candidate_normalized_text == existing["normalized_text"]: - return True, "bsky:normalized_text" - - return False, None - - -# --- Main Sync Logic --- -def sync_feeds(args): - logging.info("πŸ”„ Starting sync cycle...") - dry_run = getattr(args, "dry_run", False) - bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS - bot_locale = bsky_langs_to_playwright_locale(bsky_langs) # βœ… now defined - _cache.locale = bot_locale - - if dry_run: - logging.info("πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky.") - - try: - state = load_state(STATE_PATH) - state = prune_state(state, max_entries=5000) - - tweets = scrape_tweets_via_playwright( - args.twitter_username, - args.twitter_password, - args.twitter_email, - args.twitter_handle, - locale=bot_locale, - ) - - if not tweets: - logging.warning( - "⚠️ No tweets found or failed to fetch. " - "Skipping Bluesky sync for this cycle." - ) - return - - bsky_client = None - if not dry_run: - bsky_client = create_bsky_client( - args.bsky_base_url, - args.bsky_handle, - args.bsky_password, - ) - - recent_bsky_posts = [] - if not dry_run: - recent_bsky_posts = get_recent_bsky_posts( - bsky_client, - args.bsky_handle, - limit=DEDUPE_BSKY_LIMIT, - ) - - logging.info( - f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts " - f"for duplicate detection." - ) - logging.info( - f"🧠 Local state currently tracks " - f"{len(state.get('posted_tweets', {}))} posted items." - ) - - too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) - logging.info(f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}") - - candidate_tweets = [] - cheap_candidates = [] - - for tweet in reversed(tweets): - try: - tweet_time = arrow.get(tweet.created_on) - - if tweet_time < too_old_cutoff: - logging.info(f"⏭️ Skipping old tweet from {tweet_time}") - continue - - if tweet.is_retweet: - logging.info( - f"⏭️ Skipping retweet/repost: {tweet.tweet_url}" - ) - continue - - canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url) - if canonical_tweet_url and canonical_tweet_url in state.get( - "posted_tweets", {} - ): - logging.info( - f"⚑ Early skip due to known tweet URL in local state: " - f"{canonical_tweet_url}" - ) - continue - - scraped_text = clean_post_text(tweet.text or "") - if not scraped_text and not tweet.media: - logging.info( - f"⏭️ Skipping empty/blank tweet from {tweet_time}" - ) - continue - - cheap_candidates.append( - (tweet, tweet_time, canonical_tweet_url) - ) - - except Exception as e: - logging.warning(f"⚠️ Failed during cheap prefilter: {e}") - - logging.info( - f"⚑ {len(cheap_candidates)} tweets remain after cheap prefilter." - ) - - with httpx.Client() as resolve_http_client: - for tweet, tweet_time, canonical_tweet_url in cheap_candidates: - try: - ( - full_clean_text, - resolved_primary_external_url, - ) = build_effective_tweet_text(tweet, resolve_http_client) - - normalized_text = normalize_post_text(full_clean_text) - - if not normalized_text and not tweet.media: - logging.info( - f"⏭️ Skipping empty/blank tweet after enrichment " - f"from {tweet_time}" - ) - continue - - ordered_non_x_urls = extract_ordered_non_x_urls( - full_clean_text - ) - - canonical_non_x_urls = set() - if resolved_primary_external_url: - canonical_non_x_urls.add( - canonicalize_url(resolved_primary_external_url) - ) - - for raw_url in ordered_non_x_urls: - if not is_tco_domain( - raw_url - ) and not is_x_or_twitter_domain(raw_url): - canonical_non_x_urls.add(canonicalize_url(raw_url)) - - primary_non_x_url = None - if resolved_primary_external_url: - primary_non_x_url = resolved_primary_external_url - else: - primary_non_x_url = extract_first_visible_non_x_url( - full_clean_text - ) - if not primary_non_x_url and ordered_non_x_urls: - primary_non_x_url = ordered_non_x_urls[0] - - has_video = any( - getattr(m, "type", None) == "video" - for m in (tweet.media or []) - ) - has_photo = any( - getattr(m, "type", None) == "photo" - for m in (tweet.media or []) - ) - - raw_text = choose_final_visible_text( - full_clean_text, - primary_non_x_url=primary_non_x_url, - prefer_full_text_without_url=False, - ) - - media_fingerprint = build_media_fingerprint(tweet) - text_media_key = build_text_media_key( - normalized_text, media_fingerprint - ) - - candidate = { - "tweet": tweet, - "tweet_time": tweet_time, - "raw_text": raw_text, - "full_clean_text": full_clean_text, - "normalized_text": normalized_text, - "media_fingerprint": media_fingerprint, - "text_media_key": text_media_key, - "canonical_tweet_url": canonical_tweet_url, - "canonical_non_x_urls": canonical_non_x_urls, - "ordered_non_x_urls": ordered_non_x_urls, - "primary_non_x_url": primary_non_x_url, - "resolved_primary_external_url": resolved_primary_external_url, - "looks_like_title_plus_url": looks_like_title_plus_url_post( - full_clean_text - ), - "has_video": has_video, - "has_photo": has_photo, - } - - is_dup_state, reason_state = candidate_matches_state( - candidate, state - ) - if is_dup_state: - logging.info( - f"⏭️ Skipping candidate due to local state duplicate " - f"match on: {reason_state}" - ) - continue - - is_dup_bsky, reason_bsky = candidate_matches_existing_bsky( - candidate, recent_bsky_posts - ) - if is_dup_bsky: - logging.info( - f"⏭️ Skipping candidate due to recent Bluesky duplicate " - f"match on: {reason_bsky}" - ) - continue - - candidate_tweets.append(candidate) - - except Exception as e: - logging.warning( - f"⚠️ Failed to prepare candidate tweet: {e}" - ) - - logging.info( - f"πŸ“¬ {len(candidate_tweets)} tweets remain after duplicate filtering." - ) - - if not candidate_tweets: - logging.info( - "βœ… No new tweets need posting after duplicate comparison." - ) - return - - new_posts = 0 - browser_state_file = "twitter_browser_state.json" - - with sync_playwright() as p, httpx.Client() as media_http_client: - browser = p.chromium.launch( - headless=True, - args=["--disable-blink-features=AutomationControlled"], - ) - context_kwargs = { - "user_agent": ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" - ), - "viewport": {"width": 1920, "height": 1080}, - "locale": bot_locale, - } - if os.path.exists(browser_state_file): - context_kwargs["storage_state"] = browser_state_file - - browser_context = browser.new_context(**context_kwargs) - - for candidate in candidate_tweets: - tweet = candidate["tweet"] - tweet_time = candidate["tweet_time"] - raw_text = candidate["raw_text"] - full_clean_text = candidate["full_clean_text"] - - logging.info( - f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} " - f"missing tweet from {tweet_time} to Bluesky..." - ) - - if dry_run: - logging.info( - f" πŸ“„ Text: {raw_text[:200]}" - ) - logging.info( - f" πŸ”— Primary external URL: " - f"{candidate.get('resolved_primary_external_url', 'None')}" - ) - logging.info( - f" πŸƒ Card URL: {getattr(tweet, 'card_url', 'None')}" - ) - logging.info( - f" 🎬 Has video: {candidate.get('has_video', False)}" - ) - logging.info( - f" πŸ–ΌοΈ Has photo: {candidate.get('has_photo', False)}" - ) - logging.info( - f" πŸ” Is retweet: {getattr(tweet, 'is_retweet', False)}" - ) - - remember_posted_tweet( - state, - candidate, - bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}", - ) - save_state(state, STATE_PATH) - new_posts += 1 - continue - - link_meta_for_alt: dict = {} - if candidate.get("resolved_primary_external_url"): - try: - link_meta_for_alt = fetch_link_metadata( - candidate["resolved_primary_external_url"], - media_http_client, - ) - except Exception: - pass - - rich_text = make_rich(raw_text) - dynamic_alt = build_dynamic_alt( - full_clean_text, - link_title=link_meta_for_alt.get("title"), - ) - - image_embeds = [] - video_embed = None - external_embed = None - media_upload_failures = [] - - has_video = candidate.get("has_video", False) - - if has_video: - video_media = next( - ( - m - for m in (tweet.media or []) - if getattr(m, "type", None) == "video" - ), - None, - ) - - if video_media: - if not tweet.tweet_url: - logging.warning( - "⚠️ Tweet has video marker but no tweet URL. " - "Skipping video." - ) - media_upload_failures.append("video:no_tweet_url") - else: - temp_video_base = make_unique_video_temp_base( - tweet.tweet_url - ) - temp_video_path = f"{temp_video_base}.mp4" - - try: - real_video_url = ( - extract_video_url_from_tweet_page( - browser_context, tweet.tweet_url - ) - ) - if not real_video_url: - logging.warning( - f"⚠️ Could not resolve playable video URL " - f"for {tweet.tweet_url}" - ) - media_upload_failures.append( - f"video:resolve_failed:{tweet.tweet_url}" - ) - else: - cropped_video_path = download_and_crop_video( - real_video_url, temp_video_path - ) - if not cropped_video_path: - logging.warning( - f"⚠️ Video download/crop failed for " - f"{tweet.tweet_url}" - ) - media_upload_failures.append( - f"video:crop_failed:{tweet.tweet_url}" - ) - else: - video_blob = get_blob_from_file( - cropped_video_path, bsky_client - ) - if not video_blob: - logging.warning( - f"⚠️ Video upload blob failed for " - f"{tweet.tweet_url}" - ) - media_upload_failures.append( - f"video:upload_failed:{tweet.tweet_url}" - ) - else: - video_embed = build_video_embed( - video_blob, dynamic_alt - ) - if not video_embed: - media_upload_failures.append( - f"video:embed_failed:{tweet.tweet_url}" - ) - finally: - remove_file_quietly(temp_video_path) - remove_file_quietly( - f"{temp_video_base}_source.mp4" - ) - remove_file_quietly( - f"{temp_video_base}_trimmed.mp4" - ) - remove_file_quietly( - f"{temp_video_base}_compressed.mp4" - ) - - if not video_embed: - logging.warning( - "⚠️ Tweet contains video, but video could not be " - "posted. Skipping photo fallback for this tweet." - ) - - else: - if tweet.media: - for media in tweet.media: - if media.type == "photo": - blob = get_blob_from_url( - media.media_url_https, - bsky_client, - media_http_client, - ) - if blob: - image_embeds.append( - models.AppBskyEmbedImages.Image( - alt=dynamic_alt, - image=blob, - ) - ) - else: - media_upload_failures.append( - f"photo:{media.media_url_https}" - ) - - # --- External link card logic --- - if not video_embed and not image_embeds: - candidate_url = candidate.get( - "resolved_primary_external_url" - ) - - if candidate_url: - if candidate.get("looks_like_title_plus_url"): - logging.info( - f"πŸ”— Detected title+URL post style. " - f"Using resolved URL for external card: " - f"{candidate_url}" - ) - else: - logging.info( - f"πŸ”— Using resolved first external URL for " - f"external card: {candidate_url}" - ) - - external_embed = build_external_link_embed( - candidate_url, - bsky_client, - media_http_client, - fallback_title="Link", - prefetched_metadata=link_meta_for_alt or None, - ) - - if external_embed: - logging.info( - f"βœ… Built external link card for URL: " - f"{candidate_url}" - ) - else: - logging.info( - f"ℹ️ Could not build external link card metadata " - f"for URL: {candidate_url}" - ) - - try: - post_result = None - post_mode = "text" - - if video_embed: - post_result = send_post_with_retry( - bsky_client, - text=rich_text, - embed=video_embed, - langs=bsky_langs, - ) - post_mode = "video" - elif image_embeds: - embed = models.AppBskyEmbedImages.Main( - images=image_embeds - ) - post_result = send_post_with_retry( - bsky_client, - text=rich_text, - embed=embed, - langs=bsky_langs, - ) - post_mode = f"images:{len(image_embeds)}" - elif external_embed: - post_result = send_post_with_retry( - bsky_client, - text=rich_text, - embed=external_embed, - langs=bsky_langs, - ) - post_mode = "external_link_card" - else: - post_result = send_post_with_retry( - bsky_client, - text=rich_text, - langs=bsky_langs, - ) - post_mode = "text_only" - - bsky_uri = getattr(post_result, "uri", None) - - remember_posted_tweet( - state, candidate, bsky_uri=bsky_uri - ) - state = prune_state(state, max_entries=5000) - save_state(state, STATE_PATH) - - recent_bsky_posts.insert( - 0, - { - "uri": bsky_uri, - "text": raw_text, - "normalized_text": candidate["normalized_text"], - "canonical_non_x_urls": candidate[ - "canonical_non_x_urls" - ], - "media_fingerprint": candidate["media_fingerprint"], - "text_media_key": candidate["text_media_key"], - "created_at": arrow.utcnow().isoformat(), - }, - ) - recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] - - new_posts += 1 - - if media_upload_failures: - logging.warning( - f"βœ… Posted tweet to Bluesky with degraded media " - f"mode ({post_mode}). " - f"Failed media items: {media_upload_failures}" - ) - else: - logging.info( - f"βœ… Posted new tweet to Bluesky with mode " - f"{post_mode}: {raw_text}" - ) - - time.sleep(5) - - except Exception as e: - logging.error( - f"❌ Failed to post tweet to Bluesky: {e}" - ) - - browser.close() - - logging.info( - f"βœ… Sync complete. Posted {new_posts} new updates." - ) - - except Exception as e: - logging.error(f"❌ Error during sync cycle: {e}") +# ============================================================ +# CLI +# ============================================================ def main(): - load_dotenv() + setup_logging() - parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") + parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") + parser.add_argument("text", help="Post text content") + parser.add_argument("--username", required=True, help="Bluesky handle or email") + parser.add_argument("--password", required=True, help="Bluesky app password") + parser.add_argument("--service", default="https://bsky.social", help="PDS URL") + parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") + parser.add_argument("--image", default=None, help="Path to image file") + parser.add_argument("--video", default=None, help="Path to video file") + parser.add_argument("--alt", default="", help="Alt text for media") parser.add_argument( - "--twitter-username", - help="Your Twitter login username", + "--video-settle-delay", + type=float, + default=30.0, + help="Seconds to wait after direct-PDS fallback upload before posting.", ) parser.add_argument( - "--twitter-password", - # NOTE (FIX #15): passwords passed via CLI are visible in `ps aux`. - # Prefer setting TWITTER_PASSWORD in your .env file instead. - help="Your Twitter login password", - ) - parser.add_argument( - "--twitter-email", - help="Your Twitter email for security challenges", - ) - parser.add_argument( - "--twitter-handle", - help="The Twitter account to scrape", - ) - parser.add_argument( - "--bsky-handle", - help="Your Bluesky handle", - ) - parser.add_argument( - "--bsky-password", - # NOTE (FIX #15): same warning as --twitter-password above. - # Prefer setting BSKY_APP_PASSWORD in your .env file instead. - help="Your Bluesky app password", - ) - parser.add_argument( - "--bsky-base-url", - help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social", - ) - parser.add_argument( - "--bsky-langs", - help="Comma-separated language codes for Bluesky posts (default: ca)", - default=None, - ) - parser.add_argument( - "--dry-run", + "--allow-pds-video-fallback", action="store_true", - default=False, - help=( - "Simulate sync without posting to Bluesky. " - "Logs what would be posted." - ), + help="Allow direct PDS video fallback if video.bsky.app fails.", ) + # Compression defaults ON + parser.add_argument( + "--compress-video", + dest="compress_video", + action="store_true", + default=True, + help="Compress video with ffmpeg before upload (default: enabled).", + ) + parser.add_argument( + "--no-compress-video", + dest="compress_video", + action="store_false", + help="Disable ffmpeg compression.", + ) + parser.add_argument("--max-video-mb", type=float, default=45.0, help="Target max size (MB) after compression.") + parser.add_argument("--ffmpeg-crf", type=int, default=28, help="ffmpeg CRF (lower=better quality, larger file).") + parser.add_argument("--ffmpeg-preset", default="veryfast", help="ffmpeg preset (ultrafast..veryslow).") + args = parser.parse_args() - # Resolve credentials: CLI args take priority, then env vars. - # FIX #15 β€” env vars are the secure path; CLI args expose secrets in - # the process list. Operators should prefer .env / environment variables. - args.twitter_username = args.twitter_username or os.getenv( - "TWITTER_USERNAME" - ) - args.twitter_password = args.twitter_password or os.getenv( - "TWITTER_PASSWORD" - ) - args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") - args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") - args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") - args.twitter_handle = ( - args.twitter_handle - or os.getenv("TWITTER_HANDLE") - or args.twitter_username - ) - args.bsky_base_url = ( - args.bsky_base_url - if args.bsky_base_url - else DEFAULT_BSKY_BASE_URL - ) + if args.image and args.video: + logging.error("❌ Use either --image or --video, not both.") + sys.exit(1) - # --- Language handling: CLI > env > default (Catalan) --- - raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") - if raw_langs: - args.bsky_langs = [ - lang.strip() - for lang in raw_langs.split(",") - if lang.strip() - ] - logging.info( - f"🌍 Using configured Bluesky languages: {args.bsky_langs}" - ) - else: - args.bsky_langs = DEFAULT_BSKY_LANGS - logging.info( - f"🌍 Using default Bluesky languages: {args.bsky_langs}" + client = Client(base_url=args.service) + + success = login_with_backoff( + client=client, + username=args.username, + password=args.password, + service_url=args.service, + max_attempts=RetryConfig.login_max_attempts, + base_delay=RetryConfig.login_base_delay_seconds, + max_delay=RetryConfig.login_max_delay_seconds, + jitter=RetryConfig.login_jitter_seconds, + ) + if not success: + sys.exit(1) + + langs = [l.strip() for l in args.lang.split(",") if l.strip()] + + video_path_for_upload = args.video + temp_compressed_path = None + + if args.video and args.compress_video: + compressed = compress_video_ffmpeg( + input_path=args.video, + max_size_mb=args.max_video_mb, + crf=args.ffmpeg_crf, + preset=args.ffmpeg_preset, + audio_bitrate_k=96, ) + if compressed is None: + logging.error("❌ Compression failed; aborting.") + sys.exit(1) + video_path_for_upload = compressed + if compressed != args.video: + temp_compressed_path = compressed - missing_args = [] - if not args.twitter_username: - missing_args.append("--twitter-username / TWITTER_USERNAME") - if not args.twitter_password: - missing_args.append("--twitter-password / TWITTER_PASSWORD") - if not args.bsky_handle: - missing_args.append("--bsky-handle / BSKY_HANDLE") - if not args.bsky_password: - missing_args.append("--bsky-password / BSKY_APP_PASSWORD") - - if missing_args: - logging.error( - f"❌ Missing credentials! You forgot to provide: " - f"{', '.join(missing_args)}" - ) - return - - logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") - logging.info( - f"🌍 Posting destination base URL: {args.bsky_base_url}" + post_success = post_to_bsky( + client=client, + text=args.text, + langs=langs, + image_path=args.image, + video_path=video_path_for_upload, + alt_text=args.alt, + service_url=args.service, + video_settle_delay=args.video_settle_delay, + allow_pds_video_fallback=args.allow_pds_video_fallback, ) - if args.dry_run: - logging.info( - "πŸ§ͺ DRY RUN MODE ENABLED β€” no posts will be created." - ) + try: + if temp_compressed_path and os.path.exists(temp_compressed_path): + os.remove(temp_compressed_path) + logging.info(f"🧹 Removed temp file: {temp_compressed_path}") + except Exception: + pass - reset_caches() - sync_feeds(args) - logging.info("πŸ€– Bot finished.") + if not post_success: + sys.exit(1) if __name__ == "__main__": - main() \ No newline at end of file + main()