diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index f2234ed..061f83d 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -1,604 +1,3679 @@ -#!/usr/bin/env python3 -""" -Post text + optional image/video to Bluesky/federated PDS. - -Key reliability choices: -- Video uploads go through https://video.bsky.app first (best client playback compatibility). -- getServiceAuth uses: - aud = did:web: - lxm = com.atproto.repo.uploadBlob -- Handles 409 already_exists from video service by reusing jobId. -- Uses raw lexicon dict embeds (NO AppBskyEmbedVideo typed model), avoiding BlobRef SDK mismatch. -- Optional direct-PDS fallback for video. -- ffmpeg compression enabled by default (disable with --no-compress-video). -""" - import argparse +import arrow +import hashlib +import html +import io +import json import logging -import mimetypes -import os -import random -import secrets -import shutil -import string -import subprocess -import sys -import tempfile +import re +import httpx import time +import os +import subprocess +import uuid +import random from urllib.parse import urlparse +from dotenv import load_dotenv +from atproto import Client, client_utils, models +from playwright.sync_api import sync_playwright +from moviepy import VideoFileClip +from bs4 import BeautifulSoup +from PIL import Image +import grapheme # add to imports at top -import requests -from atproto import Client +# --- Configuration --- +LOG_PATH = "twitter2bsky.log" +STATE_PATH = "twitter2bsky_state.json" +SCRAPE_TWEET_LIMIT = 30 +DEDUPE_BSKY_LIMIT = 30 +TWEET_MAX_AGE_DAYS = 3 +BSKY_TEXT_MAX_LENGTH = 300 +DEFAULT_BSKY_LANGS = ["ca"] + +VIDEO_MAX_DURATION_SECONDS = 179 +MAX_VIDEO_UPLOAD_SIZE_MB = 45 + +BSKY_IMAGE_MAX_BYTES = 950 * 1024 +BSKY_IMAGE_MAX_DIMENSION = 2000 +BSKY_IMAGE_MIN_JPEG_QUALITY = 45 + +EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 +EXTERNAL_THUMB_MAX_DIMENSION = 1200 +EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 + +BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 +BSKY_BLOB_UPLOAD_BASE_DELAY = 10 +BSKY_BLOB_UPLOAD_MAX_DELAY = 300 +BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 +BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 + +BSKY_SEND_POST_MAX_RETRIES = 3 +BSKY_SEND_POST_BASE_DELAY = 5 +BSKY_SEND_POST_MAX_DELAY = 60 + +# --- Login hardening (NEW) --- +BSKY_LOGIN_MAX_RETRIES = 4 +BSKY_LOGIN_BASE_DELAY = 10 +BSKY_LOGIN_MAX_DELAY = 600 +BSKY_LOGIN_JITTER_MAX = 1.5 + +MEDIA_DOWNLOAD_TIMEOUT = 30 +LINK_METADATA_TIMEOUT = 10 +URL_RESOLVE_TIMEOUT = 12 +PLAYWRIGHT_RESOLVE_TIMEOUT_MS = 60000 +SUBPROCESS_TIMEOUT_SECONDS = 180 +FFPROBE_TIMEOUT_SECONDS = 15 +DEFAULT_BSKY_BASE_URL = "https://bsky.social" + +OG_TITLE_WAIT_TIMEOUT_MS = 7000 +PLAYWRIGHT_POST_GOTO_SLEEP_S = 2.0 +PLAYWRIGHT_IDLE_POLL_SLEEP_S = 0.8 +PLAYWRIGHT_IDLE_POLL_ROUNDS = 4 +PLAYWRIGHT_RETRY_SLEEP_S = 2.0 +VIDEO_PLAYER_WAIT_ROUNDS = 8 +VIDEO_PLAYER_RETRY_ROUNDS = 5 +URL_TAIL_MIN_PREFIX_CHARS = 35 +URL_TAIL_MAX_LOOKBACK_CHARS = 120 +URL_TAIL_MAX_CLAUSE_DISTANCE = 180 +DYNAMIC_ALT_MAX_LENGTH = 150 +TRUNCATE_MIN_PREFIX_CHARS = 20 +SHORT_TWEET_OG_FETCH_THRESHOLD = 35 +ORPHAN_DIGIT_MAX_DIGITS = 3 +SESSION_FILE_PERMISSIONS = 0o600 + +# --- Logging Setup --- +logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(), + ], + level=logging.INFO, +) + +# --- Per-run caches --- +class _RunCache: + def __init__(self): + self.og_title: dict = {} + self.url_resolution: dict = {} + self.url_validity: dict = {} + self.locale: str = "en-US" # ← ADDED locale cache here + + def clear(self): + self.og_title.clear() + self.url_resolution.clear() + self.url_validity.clear() + +_cache = _RunCache() -# --------------------------- -# Logging -# --------------------------- -def setup_logging() -> None: - logging.basicConfig( - format="%(asctime)s %(levelname)s %(message)s", - level=logging.INFO, - stream=sys.stdout, +def reset_caches(): + _cache.clear() + +def grapheme_len(text): + """Return the grapheme cluster count, matching Bluesky's character counting.""" + return grapheme.length(text) + +# BCP-47 language tag β†’ sensible locale for Playwright +_LANG_TO_LOCALE = { + "ca": "ca-ES", + "es": "es-ES", + "en": "en-US", + "fr": "fr-FR", + "de": "de-DE", + "pt": "pt-PT", + "it": "it-IT", + "nl": "nl-NL", + "eu": "eu-ES", + "gl": "gl-ES", +} + +def bsky_langs_to_playwright_locale(bsky_langs): + """ + Convert the first configured Bluesky language tag to a Playwright locale + string (e.g. ['ca'] β†’ 'ca-ES'). Falls back to 'en-US' if unknown. + """ + if not bsky_langs: + return "en-US" + primary = bsky_langs[0].strip().lower() + return _LANG_TO_LOCALE.get(primary, f"{primary}-{primary.upper()}") + +# --- Custom Classes --- +class ScrapedMedia: + def __init__(self, url, media_type="photo"): + self.type = media_type + self.media_url_https = url + + +class ScrapedTweet: + def __init__(self, created_on, text, media_urls, tweet_url=None, card_url=None, is_retweet=False): + self.created_on = created_on + self.text = text + self.tweet_url = tweet_url + self.card_url = card_url + self.is_retweet = is_retweet + self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] + + +# --- Helpers --- +def take_error_screenshot(page, error_msg): + logging.info(f"πŸ“Έ Taking screenshot... Shot: {error_msg}") + timestamp = time.strftime("%Y%m%d_%H%M%S") + screenshot_name = f"screenshot_{timestamp}.png" + page.screenshot(path=screenshot_name) + logging.info(f"πŸ“Έ Screenshot saved as: {screenshot_name}") + + +def is_valid_url(url): + if url in _cache.url_validity: + return _cache.url_validity[url] + + try: + response = httpx.head(url, timeout=5, follow_redirects=True) + result = response.status_code < 500 + except Exception: + result = False + + _cache.url_validity[url] = result + return result + + +def strip_trailing_url_punctuation(url): + if not url: + return url + # Strip a trailing hashtag-style fragment (#Word) that is really a social + # hashtag glued to the end of a URL with no space, e.g. + # https://cit.transit.gencat.cat#SCT β†’ https://cit.transit.gencat.cat + # Only stripped when it starts with a letter so real anchors like + # /page#section-2 inside a longer sentence are left alone. + url = re.sub(r"#[A-Za-z]\w*$", "", url.strip()) + return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url) + + +def split_url_hashtag_suffix(text): + """ + Split a URL that has a hashtag fragment glued to it with no space, e.g.: + 'https://cit.transit.gencat.cat#SCT' + becomes: + 'https://cit.transit.gencat.cat #SCT' + + Only splits when the fragment looks like a social hashtag: starts with # + followed by a letter then word characters. The lookahead (?=\\s|$) ensures + we only act at a word boundary so mid-sentence anchors followed by more + URL path are left untouched. + """ + if not text: + return text + + fixed = re.sub( + r"(https?://[^\s#<>\"']+)(#[A-Za-z]\w*)(?=\s|$)", + r"\1 \2", + text, + ) + if fixed != text: + logging.info("πŸ”§ Split hashtag suffix from URL in text") + return fixed + + +def split_concatenated_urls(text): + if not text: + return text + + fixed = re.sub(r"(https?://[^\s]+?)(https?://)", r"\1 \2", text) + if fixed != text: + logging.info("πŸ”§ Split concatenated URLs in text") + return fixed + + +def repair_broken_urls(text): + if not text: + return text + + original = text + text = split_concatenated_urls(text) + # Split glued hashtag suffixes before any rejoining passes + text = split_url_hashtag_suffix(text) + + text = re.sub(r"(https?://)\s*[\r\n]+\s*", r"\1", text, flags=re.IGNORECASE) + + prev_text = None + while prev_text != text: + prev_text = text + text = re.sub( + r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", + r"\1\2", + text, + flags=re.IGNORECASE, + ) + + text = re.sub( + r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", + r"\1\2", + text, + flags=re.IGNORECASE, ) + text = split_concatenated_urls(text) + # Run hashtag split again after rejoining passes β€” the rejoining regex + # contains # in its character class so it can re-glue a fragment. + text = split_url_hashtag_suffix(text) -# --------------------------- -# Login helpers -# --------------------------- -def is_auth_error(exc: Exception) -> bool: - t = repr(exc).lower() - return "401" in t or "403" in t or "invalid identifier or password" in t + if text != original: + logging.info("πŸ”§ Repaired broken URL wrapping in scraped text") + + return text -def is_rate_limited_error(exc: Exception) -> bool: - t = repr(exc).lower() - return "429" in t or "ratelimit" in t or "too many requests" in t +def repair_broken_mentions(text): + if not text: + return text + lines = text.splitlines() + result = [] + i = 0 + changed = False -def is_transient_error(exc: Exception) -> bool: - t = repr(exc) - needles = [ - "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", - "TimeoutException", "InvokeTimeoutError", "502", "503", "504", - ] - return any(n in t for n in needles) + def is_mention_only_line(s): + return bool(re.fullmatch(r"@[A-Za-z0-9_]+", s.strip())) + def is_blank_line(s): + return not s.strip() -def login_with_backoff( - client: Client, - username: str, - password: str, - service_url: str, - max_attempts: int = 5, - base_delay: float = 8.0, - max_delay: float = 120.0, -) -> bool: - for attempt in range(1, max_attempts + 1): - try: - logging.info(f"πŸ”‘ Login attempt {attempt}/{max_attempts} β†’ {service_url} as {username}") - client.login(username, password) - logging.info("βœ… Login successful.") - return True - except Exception as e: - logging.exception("❌ Login failed") - if is_auth_error(e): - logging.error("❌ Authentication failed. Check handle/app-password.") - return False - if attempt >= max_attempts: - return False - if is_rate_limited_error(e) or is_transient_error(e): - wait = min(base_delay * attempt, max_delay) + random.uniform(0.1, 1.2) - logging.warning(f"⏳ Retrying login in {wait:.1f}s...") - time.sleep(wait) - else: - return False - return False + while i < len(lines): + current = lines[i] + stripped = current.strip() - -# --------------------------- -# Generic helpers -# --------------------------- -def wait_with_heartbeat(seconds: float, label: str) -> None: - if seconds <= 0: - return - logging.info(f"⏳ Waiting {seconds:.0f}s for {label}...") - remaining = seconds - while remaining > 0: - step = min(5.0, remaining) - time.sleep(step) - remaining -= step - if remaining > 0: - logging.info(f" ...still waiting ({remaining:.0f}s remaining)...") - logging.info("βœ… Wait complete.") - - -def pds_did_from_service_url(service_url: str) -> str: - host = (urlparse(service_url).hostname or "").lower() - if not host: - raise ValueError(f"Invalid --service URL: {service_url}") - return f"did:web:{host}" - - -def random_video_name(ext: str = ".mp4") -> str: - token = "".join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(12)) - return f"{int(time.time())}_{token}{ext}" - - -def extract_token_from_service_auth(resp_obj) -> str | None: - tok = getattr(resp_obj, "token", None) - if tok: - return tok - if isinstance(resp_obj, dict): - return resp_obj.get("token") - return None - - -def extract_blob_from_upload_blob_result(resp_obj): - blob = getattr(resp_obj, "blob", None) - if blob is not None: - return blob - if isinstance(resp_obj, dict): - return resp_obj.get("blob") - return None - - -# --------------------------- -# ffmpeg compression -# --------------------------- -def ffmpeg_exists() -> bool: - return shutil.which("ffmpeg") is not None - - -def ffprobe_exists() -> bool: - return shutil.which("ffprobe") is not None - - -def get_video_duration_seconds(path: str) -> float | None: - if not ffprobe_exists(): - return None - try: - out = subprocess.check_output( - [ - "ffprobe", "-v", "error", - "-show_entries", "format=duration", - "-of", "default=noprint_wrappers=1:nokey=1", - path, - ], - stderr=subprocess.STDOUT, - text=True, - ).strip() - return float(out) - except Exception: - return None - - -def compress_video_ffmpeg( - input_path: str, - max_size_mb: float = 45.0, - crf: int = 28, - preset: str = "veryfast", - audio_bitrate_k: int = 96, -) -> str | None: - if not ffmpeg_exists(): - logging.error("❌ ffmpeg not found in PATH. Install ffmpeg or use --no-compress-video.") - return None - if not os.path.exists(input_path): - logging.error(f"❌ Video file not found: {input_path}") - return None - - src_size_mb = os.path.getsize(input_path) / (1024 * 1024) - logging.info(f"πŸ“¦ Source video size: {src_size_mb:.2f} MB") - if src_size_mb <= max_size_mb: - logging.info("βœ… Already below target size. Skipping compression.") - return input_path - - duration = get_video_duration_seconds(input_path) - target_video_k = 1200 - if duration and duration > 0: - total_kbps = (max_size_mb * 8192.0) / duration - target_video_k = int(max(300, total_kbps - audio_bitrate_k)) - target_video_k = min(max(target_video_k, 300), 5000) - - fd, out_path = tempfile.mkstemp(prefix="bsky_vid_", suffix=".mp4") - os.close(fd) - - cmd = [ - "ffmpeg", "-y", - "-i", input_path, - "-c:v", "libx264", - "-preset", preset, - "-crf", str(crf), - "-b:v", f"{target_video_k}k", - "-maxrate", f"{int(target_video_k * 1.3)}k", - "-bufsize", f"{int(target_video_k * 2)}k", - "-vf", "scale='min(1280,iw)':-2", - "-c:a", "aac", - "-b:a", f"{audio_bitrate_k}k", - "-movflags", "+faststart", - out_path, - ] - - try: - logging.info(f"πŸ› οΈ Compressing video (target≀{max_size_mb}MB, crf={crf}, preset={preset})...") - subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - out_size_mb = os.path.getsize(out_path) / (1024 * 1024) - logging.info(f"βœ… Compressed size: {out_size_mb:.2f} MB") - if out_size_mb < src_size_mb: - return out_path - os.remove(out_path) - logging.info("ℹ️ Compression not smaller than source. Using original.") - return input_path - except subprocess.CalledProcessError as e: - logging.error("❌ ffmpeg compression failed.") - if e.stderr: - logging.error(e.stderr[-2000:]) - try: - os.remove(out_path) - except Exception: - pass - return None - - -# --------------------------- -# Media upload: image -# --------------------------- -def upload_image_embed_dict(client: Client, image_path: str, alt_text: str = "") -> dict | None: - if not os.path.exists(image_path): - logging.error(f"❌ Image file not found: {image_path}") - return None - - mime, _ = mimetypes.guess_type(image_path) - with open(image_path, "rb") as f: - data = f.read() - - logging.info(f"πŸ–ΌοΈ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime or 'unknown'})") - try: - up = client.upload_blob(data) - blob = extract_blob_from_upload_blob_result(up) - if blob is None: - logging.error("❌ uploadBlob returned no blob for image.") - return None - - # Raw lexicon dict embed (cross-SDK safe) - return { - "$type": "app.bsky.embed.images", - "images": [ - { - "alt": alt_text or "", - "image": blob, - } - ], - } - except Exception as e: - logging.error(f"❌ Image upload failed: {repr(e)}") - return None - - -# --------------------------- -# Media upload: video via video.bsky.app (primary) -# --------------------------- -def upload_video_via_video_service_embed_dict( - client: Client, - video_path: str, - service_url: str, - alt_text: str = "", -) -> dict | None: - if not os.path.exists(video_path): - logging.error(f"❌ Video file not found: {video_path}") - return None - - with open(video_path, "rb") as f: - video_bytes = f.read() - - size_mb = len(video_bytes) / (1024 * 1024) - logging.info(f"🎬 [video.bsky.app] Uploading: {video_path} ({size_mb:.2f} MB)") - - video_host = "https://video.bsky.app" - pds_did = pds_did_from_service_url(service_url) - - # getServiceAuth from user's PDS with correct audience + method binding - try: - auth_resp = client.com.atproto.server.get_service_auth( - {"aud": pds_did, "lxm": "com.atproto.repo.uploadBlob", "exp": int(time.time()) + 1800} - ) - except Exception as e: - logging.error(f"❌ getServiceAuth failed: {repr(e)}") - return None - - token = extract_token_from_service_auth(auth_resp) - if not token: - logging.error("❌ getServiceAuth returned no token.") - return None - - upload_name = random_video_name(".mp4") - logging.info(f"🎞️ Upload name: {upload_name}") - - upload_url = f"{video_host}/xrpc/app.bsky.video.uploadVideo?did={client.me.did}&name={upload_name}" - headers = { - "Authorization": f"Bearer {token}", - "Content-Type": "video/mp4", - } - - try: - r = requests.post(upload_url, headers=headers, data=video_bytes, timeout=240) - except Exception as e: - logging.error(f"❌ video upload request failed: {repr(e)}") - return None - - if r.status_code not in (200, 409): - logging.error(f"❌ video.bsky.app upload failed: {r.status_code} - {r.text}") - return None - - payload = r.json() - - # Dedupe path: reuse existing job - if r.status_code == 409: - if payload.get("error") == "already_exists" and payload.get("jobId"): - logging.info("ℹ️ Video already processed on video.bsky.app. Reusing existing job.") - else: - logging.error(f"❌ 409 without reusable jobId: {payload}") - return None - - job_id = payload.get("jobId") - if not job_id: - logging.error(f"❌ No jobId in video upload response: {payload}") - return None - - logging.info(f"⏳ Job {job_id} accepted β€” polling status...") - - status_url = f"{video_host}/xrpc/app.bsky.video.getJobStatus" - deadline = time.time() + 600 # 10 min max poll - - while time.time() < deadline: - try: - s = requests.get(status_url, params={"jobId": job_id}, timeout=30) - except Exception as e: - logging.warning(f"⚠️ Status poll request failed once: {repr(e)}") - time.sleep(3) + if is_blank_line(current): + result.append("") + i += 1 continue - if s.status_code != 200: - logging.error(f"❌ Job status failed: {s.status_code} - {s.text}") - return None + if is_mention_only_line(current): + if result and result[-1].strip(): + result[-1] = result[-1].rstrip() + " " + stripped + changed = True + else: + result.append(stripped) - body = s.json() - job_status = body.get("jobStatus", {}) - state = job_status.get("state") + i += 1 - if state == "JOB_STATE_COMPLETED": - blob = job_status.get("blob") - if not blob: - logging.error(f"❌ Completed job without blob: {body}") - return None + while i < len(lines): + next_line = lines[i] + next_stripped = next_line.strip() - wait_with_heartbeat(8, "CDN propagation") + if is_blank_line(next_line): + break + if is_mention_only_line(next_line): + break - # RAW embed dict; no BlobRef conversion at all. - return { - "$type": "app.bsky.embed.video", - "video": blob, - "alt": alt_text or "", - } + result[-1] = result[-1].rstrip() + " " + next_stripped + changed = True + i += 1 - if state == "JOB_STATE_FAILED": - logging.error(f"❌ Video processing failed: {job_status}") - return None + if i < len(lines) and is_blank_line(lines[i]): + break - logging.info(f" ...still processing (state={state})...") - time.sleep(3) + continue - logging.error("❌ Video processing timed out.") + if i + 1 < len(lines) and is_mention_only_line(lines[i + 1]): + merged = stripped + " " + lines[i + 1].strip() + changed = True + i += 2 + + while i < len(lines): + next_line = lines[i] + next_stripped = next_line.strip() + + if is_blank_line(next_line): + break + if is_mention_only_line(next_line): + break + + merged = merged.rstrip() + " " + next_stripped + changed = True + i += 1 + + if i < len(lines) and is_blank_line(lines[i]): + break + + result.append(merged) + continue + + result.append(stripped) + i += 1 + + new_text = "\n".join(result) + + if changed: + logging.info("πŸ”§ Repaired broken mention wrapping in scraped text") + + return new_text + + +def strip_line_edge_whitespace(text): + if not text: + return text + + lines = text.splitlines() + cleaned_lines = [] + changed = False + + for line in lines: + cleaned = line.strip() + if cleaned != line: + changed = True + cleaned_lines.append(cleaned) + + new_text = "\n".join(cleaned_lines) + + if changed: + logging.info("πŸ”§ Stripped leading/trailing whitespace from scraped text lines") + + return new_text + + +def remove_trailing_ellipsis_line(text): + if not text: + return text + + lines = text.splitlines() + + while lines and lines[-1].strip() in {"...", "…"}: + lines.pop() + + return "\n".join(lines).strip() + + +def remove_orphaned_digit_lines_before_hashtags(text): + if not text: + return text + + lines = text.splitlines() + if len(lines) < 2: + return text + + result = [] + changed = False + i = 0 + orphan_pattern = re.compile(rf"\d{{1,{ORPHAN_DIGIT_MAX_DIGITS}}}") + + while i < len(lines): + stripped = lines[i].strip() + + if ( + stripped + and orphan_pattern.fullmatch(stripped) + and i + 1 < len(lines) + and lines[i + 1].strip().startswith("#") + ): + logging.info(f"πŸ”§ Removing orphaned digit line '{stripped}' before hashtag line") + changed = True + i += 1 + continue + + result.append(lines[i]) + i += 1 + + if changed: + return "\n".join(result) + + return text + + +def clean_post_text(text): + raw_text = (text or "").strip() + raw_text = repair_broken_urls(raw_text) + raw_text = repair_broken_mentions(raw_text) + raw_text = strip_line_edge_whitespace(raw_text) + raw_text = remove_trailing_ellipsis_line(raw_text) + raw_text = remove_orphaned_digit_lines_before_hashtags(raw_text) + return raw_text.strip() + + +def clean_url(url): + trimmed_url = url.strip() + cleaned_url = re.sub(r"\s+", "", trimmed_url) + cleaned_url = strip_trailing_url_punctuation(cleaned_url) + + if is_valid_url(cleaned_url): + return cleaned_url return None -# --------------------------- -# Media upload: direct PDS fallback (optional) -# --------------------------- -def upload_video_via_pds_embed_dict( - client: Client, - video_path: str, - alt_text: str = "", - settle_delay_seconds: float = 30.0, -) -> dict | None: - if not os.path.exists(video_path): - logging.error(f"❌ Video file not found: {video_path}") +def canonicalize_url(url): + if not url: + return None + return strip_trailing_url_punctuation(url.strip()) + + +def normalize_urlish_token(token): + if not token: return None - try: - with open(video_path, "rb") as f: - b = f.read() - - size_mb = len(b) / (1024 * 1024) - logging.warning(f"🎬 [PDS-direct fallback] Uploading: {video_path} ({size_mb:.2f} MB)") - up = client.upload_blob(b) - blob = extract_blob_from_upload_blob_result(up) - if blob is None: - logging.error("❌ PDS uploadBlob returned no blob.") - return None - - wait_with_heartbeat(settle_delay_seconds, "PDS/AppView indexing") - - return { - "$type": "app.bsky.embed.video", - "video": blob, - "alt": alt_text or "", - } - except Exception as e: - logging.error(f"❌ PDS-direct video upload failed: {repr(e)}") + token = strip_trailing_url_punctuation(token.strip()) + if not token: return None + if token.startswith(("http://", "https://")): + return token -def upload_video_smart_embed_dict( - client: Client, - video_path: str, - service_url: str, - alt_text: str = "", - settle_delay_seconds: float = 30.0, - allow_pds_video_fallback: bool = False, -) -> dict | None: - logging.info(f"🌍 PDS ({service_url}). Trying video.bsky.app first.") - embed = upload_video_via_video_service_embed_dict( - client=client, - video_path=video_path, - service_url=service_url, - alt_text=alt_text, + if token.startswith("www."): + return f"https://{token}" + + return None + + +def canonicalize_tweet_url(url): + if not url: + return None + + url = url.strip() + match = re.search( + r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", + url, + re.IGNORECASE, ) - if embed: - return embed + if not match: + return url.lower() - if allow_pds_video_fallback: - logging.warning("⚠️ video.bsky.app failed; trying direct PDS fallback.") - return upload_video_via_pds_embed_dict( - client=client, - video_path=video_path, - alt_text=alt_text, - settle_delay_seconds=settle_delay_seconds, - ) + handle = match.group(1).lower() + tweet_id = match.group(2) + return f"https://x.com/{handle}/status/{tweet_id}" - logging.error("❌ video.bsky.app failed and fallback is disabled.") + +def extract_tweet_id(tweet_url): + if not tweet_url: + return None + match = re.search(r"/status/(\d+)", tweet_url) + if match: + return match.group(1) return None -# --------------------------- -# Create post -# --------------------------- -def create_post_record( - text: str, - langs: list[str], - embed_dict: dict | None = None, -) -> dict: - record = { - "$type": "app.bsky.feed.post", - "text": text.strip(), # must be plain string - "createdAt": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()), - } - if langs: - record["langs"] = langs - if embed_dict is not None: - record["embed"] = embed_dict - return record +def make_unique_video_temp_base(tweet_url=None): + tweet_id = extract_tweet_id(tweet_url) or "unknown" + ts_ms = int(time.time() * 1000) + rand = uuid.uuid4().hex[:8] + base = f"temp_video_{tweet_id}_{ts_ms}_{rand}" + logging.info(f"🎞️ Using unique temp video base: {base}") + return base -def publish_post(client: Client, record: dict) -> bool: +def remove_file_quietly(path): + if path and os.path.exists(path): + try: + os.remove(path) + logging.info(f"🧹 Removed temp file: {path}") + except Exception as e: + logging.warning(f"⚠️ Could not remove temp file {path}: {e}") + + +def is_x_or_twitter_domain(url): try: - # Use dict payload directly for max cross-version compatibility. - resp = client.com.atproto.repo.create_record( - { - "repo": client.me.did, - "collection": "app.bsky.feed.post", - "record": record, - } - ) - uri = getattr(resp, "uri", None) or (resp.get("uri") if isinstance(resp, dict) else None) - logging.info(f"βœ… Post published! URI: {uri}") - return True - except Exception as e: - logging.error(f"❌ createRecord failed: {repr(e)}") + normalized = normalize_urlish_token(url) or url + hostname = (urlparse(normalized).hostname or "").lower() + return hostname in { + "x.com", + "www.x.com", + "twitter.com", + "www.twitter.com", + "mobile.twitter.com", + } + except Exception: return False -# --------------------------- -# Main -# --------------------------- -def main(): - setup_logging() +def is_tco_domain(url): + try: + normalized = normalize_urlish_token(url) or url + hostname = (urlparse(normalized).hostname or "").lower() + return hostname == "t.co" + except Exception: + return False - parser = argparse.ArgumentParser(description="Post text + optional image/video to Bluesky/federated PDS.") - parser.add_argument("text", help="Post text") - parser.add_argument("--username", required=True, help="Handle/email") - parser.add_argument("--password", required=True, help="App password") - parser.add_argument("--service", default="https://bsky.social", help="PDS URL") - parser.add_argument("--lang", default="ca", help="Comma-separated language codes") - parser.add_argument("--image", default=None, help="Image path") - parser.add_argument("--video", default=None, help="Video path") - parser.add_argument("--alt", default="", help="Alt text") - parser.add_argument("--video-settle-delay", type=float, default=30.0, help="Fallback indexing wait") - parser.add_argument("--allow-pds-video-fallback", action="store_true") - # Compression ON by default - parser.add_argument("--compress-video", dest="compress_video", action="store_true", default=True) - parser.add_argument("--no-compress-video", dest="compress_video", action="store_false") - parser.add_argument("--max-video-mb", type=float, default=45.0) - parser.add_argument("--ffmpeg-crf", type=int, default=28) - parser.add_argument("--ffmpeg-preset", default="veryfast") +def is_external_non_x_url(url): + if not url: + return False + return (not is_tco_domain(url)) and (not is_x_or_twitter_domain(url)) - args = parser.parse_args() - if args.image and args.video: - logging.error("❌ Use either --image or --video, not both.") - sys.exit(1) +def extract_urls_from_text(text): + if not text: + return [] - client = Client(base_url=args.service) + repaired = repair_broken_urls(text) + pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' + return re.findall(pattern, repaired) - if not login_with_backoff( - client=client, - username=args.username, - password=args.password, - service_url=args.service, - ): - sys.exit(1) - langs = [x.strip() for x in args.lang.split(",") if x.strip()] +def extract_quoted_text_from_og_title(og_title): + if not og_title: + return None - video_path_for_upload = args.video - temp_compressed_path = None + decoded = html.unescape(og_title).strip() + match = re.search(r'on X:\s*"(?P.*)"\s*/\s*X\s*$', decoded, flags=re.DOTALL) + if match: + extracted = match.group("text").strip() + if extracted: + return extracted - if args.video and args.compress_video: - compressed = compress_video_ffmpeg( - input_path=args.video, - max_size_mb=args.max_video_mb, - crf=args.ffmpeg_crf, - preset=args.ffmpeg_preset, - audio_bitrate_k=96, + first_quote = decoded.find('"') + last_quote = decoded.rfind('"') + if 0 <= first_quote < last_quote: + extracted = decoded[first_quote + 1 : last_quote].strip() + if extracted: + return extracted + + return None + + +def should_fetch_og_title(tweet): + text = clean_post_text(tweet.text or "") + urls = extract_urls_from_text(text) + + if not text: + return True + + if any(is_tco_domain(normalize_urlish_token(u) or u) for u in urls): + return True + + if "…" in text or text.endswith("..."): + return True + + if len(text) < SHORT_TWEET_OG_FETCH_THRESHOLD: + return True + + return False + + +def fetch_tweet_og_title_text(tweet_url, locale="en-US"): + if not tweet_url: + return None + + if tweet_url in _cache.og_title: + logging.info(f"⚑ Using cached og:title text for {tweet_url}") + return _cache.og_title[tweet_url] + + browser = None + browser_context = None + page = None + + try: + logging.info(f"🧾 Fetching og:title from tweet page: {tweet_url}") + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + browser_context = browser.new_context( + user_agent=( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + viewport={"width": 1280, "height": 900}, + locale=_cache.locale, # ← USE CACHE + ) + page = browser_context.new_page() + page.goto( + tweet_url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, + ) + + try: + page.wait_for_selector( + 'meta[property="og:title"]', + timeout=OG_TITLE_WAIT_TIMEOUT_MS, + ) + except Exception: + pass + + og_title = ( + page.locator('meta[property="og:title"]') + .first.get_attribute("content") + ) + extracted = extract_quoted_text_from_og_title(og_title) + + if extracted: + extracted = clean_post_text(extracted) + _cache.og_title[tweet_url] = extracted + logging.info(f"βœ… Extracted tweet text from og:title for {tweet_url}") + return extracted + + logging.info(f"ℹ️ No usable og:title text extracted for {tweet_url}") + _cache.og_title[tweet_url] = None + return None + + except Exception as e: + logging.warning( + f"⚠️ Could not extract og:title text from {tweet_url}: {repr(e)}" ) - if compressed is None: - logging.error("❌ Compression failed; aborting.") - sys.exit(1) - video_path_for_upload = compressed - if compressed != args.video: - temp_compressed_path = compressed - - embed_dict = None - - if video_path_for_upload: - logging.info(f"🎬 Preparing video upload: {video_path_for_upload}") - embed_dict = upload_video_smart_embed_dict( - client=client, - video_path=video_path_for_upload, - service_url=args.service, - alt_text=args.alt, - settle_delay_seconds=args.video_settle_delay, - allow_pds_video_fallback=args.allow_pds_video_fallback, - ) - if embed_dict is None: - logging.error("❌ Aborting post: video upload/processing failed.") - if temp_compressed_path and os.path.exists(temp_compressed_path): - os.remove(temp_compressed_path) - sys.exit(1) - - elif args.image: - embed_dict = upload_image_embed_dict(client=client, image_path=args.image, alt_text=args.alt) - if embed_dict is None: - logging.error("❌ Aborting post: image upload failed.") - if temp_compressed_path and os.path.exists(temp_compressed_path): - os.remove(temp_compressed_path) - sys.exit(1) - - record = create_post_record(text=args.text, langs=langs, embed_dict=embed_dict) - logging.info(f"🧾 Final record text={record.get('text')!r}, has_embed={'embed' in record}") - - ok = publish_post(client=client, record=record) - - if temp_compressed_path and os.path.exists(temp_compressed_path): try: - os.remove(temp_compressed_path) - logging.info(f"🧹 Removed temp file: {temp_compressed_path}") + if page: + take_error_screenshot(page, "tweet_og_title_failed") + except Exception: + pass + _cache.og_title[tweet_url] = None + return None + finally: + try: + if page: + page.close() + except Exception: + pass + try: + if browser_context: + browser_context.close() + except Exception: + pass + try: + if browser: + browser.close() except Exception: pass - if not ok: - sys.exit(1) + +def resolve_tco_with_httpx(url, http_client): + try: + response = http_client.get( + url, timeout=URL_RESOLVE_TIMEOUT, follow_redirects=True + ) + final_url = canonicalize_url(str(response.url)) + if final_url: + logging.info(f"πŸ”— Resolved t.co with httpx: {url} -> {final_url}") + return final_url + except Exception as e: + logging.warning(f"⚠️ httpx t.co resolution failed for {url}: {repr(e)}") + + return canonicalize_url(url) + + +def resolve_tco_with_playwright(url, locale="en-US"): + browser = None + browser_context = None + page = None + + try: + logging.info(f"🌐 Resolving t.co with Playwright: {url}") + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + browser_context = browser.new_context( + user_agent=( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + viewport={"width": 1280, "height": 900}, + locale=locale, + ) + page = browser_context.new_page() + + try: + page.goto( + url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, + ) + except Exception as e: + logging.warning( + f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}" + ) + + time.sleep(PLAYWRIGHT_POST_GOTO_SLEEP_S) + final_url = canonicalize_url(page.url) + + for _ in range(PLAYWRIGHT_IDLE_POLL_ROUNDS): + if final_url and is_external_non_x_url(final_url): + break + + try: + page.wait_for_load_state("networkidle", timeout=2000) + except Exception: + pass + + time.sleep(PLAYWRIGHT_IDLE_POLL_SLEEP_S) + final_url = canonicalize_url(page.url) + + logging.info(f"🌐 Playwright final URL for {url}: {final_url}") + return final_url + + except Exception as e: + logging.warning( + f"⚠️ Playwright t.co resolution failed for {url}: {repr(e)}" + ) + try: + if page: + take_error_screenshot(page, "tco_resolve_failed") + except Exception: + pass + finally: + try: + if page: + page.close() + except Exception: + pass + try: + if browser_context: + browser_context.close() + except Exception: + pass + try: + if browser: + browser.close() + except Exception: + pass + + return canonicalize_url(url) + + +def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True): + if not url: + return None + + normalized = normalize_urlish_token(url) or url + cleaned = canonicalize_url(normalized) + if not cleaned: + return None + + if cleaned in _cache.url_resolution: + logging.info( + f"⚑ Using cached URL resolution: {cleaned} -> {_cache.url_resolution[cleaned]}" + ) + return _cache.url_resolution[cleaned] + + if not is_tco_domain(cleaned): + _cache.url_resolution[cleaned] = cleaned + return cleaned + + resolved_http = resolve_tco_with_httpx(cleaned, http_client) + if is_external_non_x_url(resolved_http): + _cache.url_resolution[cleaned] = resolved_http + return resolved_http + + if not allow_playwright_fallback: + _cache.url_resolution[cleaned] = resolved_http + return resolved_http + + resolved_browser = resolve_tco_with_playwright(cleaned) + if is_external_non_x_url(resolved_browser): + logging.info( + f"βœ… Resolved t.co via Playwright to external URL: {resolved_browser}" + ) + _cache.url_resolution[cleaned] = resolved_browser + return resolved_browser + + if resolved_http and not is_tco_domain(resolved_http): + _cache.url_resolution[cleaned] = resolved_http + return resolved_http + + _cache.url_resolution[cleaned] = cleaned + return cleaned + + +def extract_non_x_urls_from_text(text): + urls = extract_urls_from_text(text) + result = [] + + for url in urls: + normalized = normalize_urlish_token(url) + cleaned = strip_trailing_url_punctuation(normalized or url) + if not cleaned: + continue + + if is_tco_domain(cleaned): + result.append(cleaned) + continue + + if not is_x_or_twitter_domain(cleaned): + result.append(cleaned) + + return result + + +def extract_ordered_non_x_urls(text): + seen = set() + ordered = [] + + for url in extract_non_x_urls_from_text(text): + canonical = canonicalize_url(url) + if canonical and canonical not in seen: + seen.add(canonical) + ordered.append(canonical) + + return ordered + + +def extract_first_visible_non_x_url(text): + for url in extract_non_x_urls_from_text(text or ""): + canonical = canonicalize_url(url) + if canonical: + return canonical + return None + + +def extract_first_resolved_external_url( + text, http_client, allow_playwright_fallback=True +): + for url in extract_non_x_urls_from_text(text or ""): + resolved = resolve_url_if_needed( + url, + http_client, + allow_playwright_fallback=allow_playwright_fallback, + ) + if not resolved: + continue + + if is_external_non_x_url(resolved): + logging.info(f"βœ… Selected resolved external URL for card: {resolved}") + return resolved + + return None + + +def resolve_card_url(card_url, http_client): + if not card_url: + return None + + cleaned = canonicalize_url(card_url.strip()) + if not cleaned: + return None + + if is_external_non_x_url(cleaned): + logging.info(f"πŸ”— Card URL is already external: {cleaned}") + return cleaned + + if is_tco_domain(cleaned): + resolved = resolve_url_if_needed( + cleaned, http_client, allow_playwright_fallback=True + ) + if resolved and is_external_non_x_url(resolved): + logging.info(f"πŸ”— Resolved card t.co URL: {cleaned} -> {resolved}") + return resolved + + if is_x_or_twitter_domain(cleaned): + logging.info( + f"ℹ️ Card URL resolves to X/Twitter domain, ignoring: {cleaned}" + ) + return None + + return cleaned + + +def sanitize_visible_urls_in_text(text, http_client, has_media=False): + if not text: + return text, None + + working = clean_post_text(text) + url_pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' + urls = re.findall(url_pattern, working) + + if not urls: + return working, None + + replacements = {} + first_external_resolved = None + + for raw_url in urls: + normalized = normalize_urlish_token(raw_url) + cleaned = canonicalize_url(normalized or raw_url) + if not cleaned: + continue + + if is_x_or_twitter_domain(cleaned): + replacements[raw_url] = "" + logging.info(f"🧹 Removing X/Twitter URL from visible text: {cleaned}") + continue + + final_url = cleaned + if is_tco_domain(cleaned): + resolved_http_first = resolve_tco_with_httpx(cleaned, http_client) + + if is_external_non_x_url(resolved_http_first): + final_url = resolved_http_first + _cache.url_resolution[cleaned] = final_url + else: + if ( + has_media + and resolved_http_first + and is_x_or_twitter_domain(resolved_http_first) + ): + final_url = resolved_http_first + _cache.url_resolution[cleaned] = final_url + logging.info( + f"⚑ Skipping Playwright t.co fallback because tweet has media " + f"and httpx already resolved to X/Twitter URL: {final_url}" + ) + else: + final_url = resolve_url_if_needed( + cleaned, http_client, allow_playwright_fallback=True + ) + + if is_x_or_twitter_domain(final_url): + replacements[raw_url] = "" + logging.info( + f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}" + ) + continue + + if normalized and normalized.startswith("https://www."): + final_url = normalized + elif normalized and normalized.startswith("http://www."): + final_url = normalized + + if is_external_non_x_url(final_url) and not first_external_resolved: + first_external_resolved = final_url + + replacements[raw_url] = final_url + + def replace_match(match): + raw = match.group(0) + return replacements.get(raw, raw) + + working = re.sub(url_pattern, replace_match, working) + + deduped_lines = [] + for line in working.splitlines(): + line_urls = re.findall(url_pattern, line) + if len(line_urls) > 1: + prefix = re.sub(url_pattern, "", line).strip() + kept_urls = [] + seen_in_line: set = set() + + for url in line_urls: + normalized = normalize_urlish_token(url) or url + canonical = canonicalize_url(normalized) + + if not canonical: + continue + if is_x_or_twitter_domain(canonical): + continue + if canonical in seen_in_line: + continue + + seen_in_line.add(canonical) + kept_urls.append(url) + + if prefix and kept_urls: + rebuilt = prefix + " " + " ".join(kept_urls) + elif prefix: + rebuilt = prefix + else: + rebuilt = " ".join(kept_urls) + + deduped_lines.append(rebuilt.strip()) + else: + cleaned_line = re.sub(r"\s{2,}", " ", line).strip() + deduped_lines.append(cleaned_line) + + working = "\n".join(deduped_lines) + working = re.sub(r"[ \t]+", " ", working) + working = re.sub(r"\n{3,}", "\n\n", working).strip() + + return working, first_external_resolved + + +def build_effective_tweet_text(tweet, http_client): + scraped_text = clean_post_text(tweet.text or "") + has_media = bool(tweet.media) + og_title_text = None + + if should_fetch_og_title(tweet): + og_title_text = fetch_tweet_og_title_text(tweet.tweet_url) + + candidate_text = scraped_text + if og_title_text: + scraped_urls = extract_urls_from_text(scraped_text) + og_urls = extract_urls_from_text(og_title_text) + + if len(og_title_text) >= len(scraped_text) or (og_urls and not scraped_urls): + candidate_text = og_title_text + logging.info("🧾 Using og:title-derived tweet text as primary content") + + candidate_text, resolved_primary_external_url = sanitize_visible_urls_in_text( + candidate_text, http_client, has_media=has_media, + ) + candidate_text = clean_post_text(candidate_text) + + resolved_card_url = resolve_card_url( + getattr(tweet, "card_url", None), http_client + ) + + if resolved_card_url and is_external_non_x_url(resolved_card_url): + if not resolved_primary_external_url: + resolved_primary_external_url = resolved_card_url + logging.info( + f"πŸ”— Using resolved card URL as primary external URL: {resolved_card_url}" + ) + elif resolved_primary_external_url != resolved_card_url: + logging.info( + f"ℹ️ Card URL ({resolved_card_url}) differs from text URL " + f"({resolved_primary_external_url}). Preferring card URL for external embed." + ) + resolved_primary_external_url = resolved_card_url + + if not resolved_primary_external_url: + resolved_primary_external_url = extract_first_resolved_external_url( + candidate_text, + http_client, + allow_playwright_fallback=not has_media, + ) + + return candidate_text, resolved_primary_external_url + + +def remove_url_from_visible_text(text, url_to_remove): + if not text or not url_to_remove: + return text + + canonical_target = canonicalize_url(url_to_remove) + lines = text.splitlines() + cleaned_lines = [] + + for line in lines: + line_urls = extract_urls_from_text(line) + new_line = line + + for url in line_urls: + normalized = normalize_urlish_token(url) or url + cleaned_candidate = canonicalize_url( + strip_trailing_url_punctuation(normalized) + ) + if cleaned_candidate == canonical_target: + pattern = re.escape(url) + new_line = re.sub(pattern, "", new_line) + + new_line = re.sub(r"[ \t]+", " ", new_line).strip() + cleaned_lines.append(new_line) + + result = "\n".join(cleaned_lines) + result = re.sub(r"[ \t]+", " ", result) + result = re.sub(r"\n{3,}", "\n\n", result).strip() + + return result + + +def looks_like_title_plus_url_post(text): + if not text: + return False + + repaired = repair_broken_urls(text) + repaired = strip_line_edge_whitespace(repaired) + lines = [line.strip() for line in repaired.splitlines() if line.strip()] + if len(lines) < 2: + return False + + last_line = lines[-1] + urls_in_last_line = extract_ordered_non_x_urls(last_line) + total_urls = extract_ordered_non_x_urls(repaired) + + return ( + len(urls_in_last_line) == 1 + and len(total_urls) == 1 + and last_line.startswith(("http://", "https://", "www.")) + ) + + +def looks_like_url_and_tag_tail(text, primary_non_x_url=None): + if not text or not primary_non_x_url: + return False + + repaired = repair_broken_urls(text) + idx = repaired.find(primary_non_x_url) + if idx == -1: + return False + + tail = repaired[idx:].strip() + if not tail.startswith(("http://", "https://", "www.")): + return False + + if re.search(r"(?:https?://|www\.)\S+.*#[^\s#]+", tail): + return True + + return False + + +def find_tail_preservation_start(text, primary_non_x_url): + if not text or not primary_non_x_url: + return None + + url_pos = text.find(primary_non_x_url) + if url_pos == -1: + return None + + hashtag_match = re.search(r"\s#[^\s#]+", text[url_pos:]) + has_hashtag_after_url = hashtag_match is not None + + candidates = [url_pos] + + clause_patterns = [ + r"\.\s+", r":\s+", r";\s+", r"!\s+", r"\?\s+", r",\s+", + ] + + before = text[:url_pos] + for pattern in clause_patterns: + for match in re.finditer(pattern, before): + candidates.append(match.end()) + + last_newline = before.rfind("\n") + if last_newline != -1: + candidates.append(last_newline + 1) + + if has_hashtag_after_url: + generous_start = max(0, url_pos - URL_TAIL_MAX_LOOKBACK_CHARS) + while generous_start > 0 and text[generous_start] not in {" ", "\n"}: + generous_start -= 1 + candidates.append(generous_start) + + reasonable_candidates = [ + c for c in candidates + if 0 <= c < url_pos and (url_pos - c) <= URL_TAIL_MAX_CLAUSE_DISTANCE + ] + + if reasonable_candidates: + start = min(reasonable_candidates, key=lambda c: (url_pos - c)) + if url_pos - start < URL_TAIL_MIN_PREFIX_CHARS: + farther = [ + c for c in reasonable_candidates + if url_pos - c >= URL_TAIL_MIN_PREFIX_CHARS + ] + if farther: + start = min(farther, key=lambda c: (url_pos - c)) + return start + + return url_pos + +def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): + if grapheme_len(text) <= max_length: + return text + + clusters = list(grapheme.graphemes(text)) + truncated = "".join(clusters[:max_length]) + last_space = truncated.rfind(" ") + if last_space > TRUNCATE_MIN_PREFIX_CHARS: + return truncated[:last_space] + return truncated +def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LENGTH): + if ( + not text + or tail_start is None + or tail_start < 0 + or tail_start >= len(text) + ): + return truncate_text_safely(text, max_length) + + if len(text) <= max_length: + return text + + tail = text[tail_start:].strip() + if not tail: + return truncate_text_safely(text, max_length) + + reserve = len(tail) + 1 + if reserve >= max_length: + shortened_tail = tail[-max_length:].strip() + first_space = shortened_tail.find(" ") + if 0 <= first_space <= 30: + shortened_tail = shortened_tail[first_space + 1:].strip() + return shortened_tail + + available_prefix = max_length - reserve + prefix = text[:tail_start].rstrip() + + if len(prefix) > available_prefix: + prefix = prefix[:available_prefix].rstrip() + last_space = prefix.rfind(" ") + if last_space > 20: + prefix = prefix[:last_space].rstrip() + + final_text = f"{prefix} {tail}".strip() + final_text = re.sub(r"[ \t]+", " ", final_text) + final_text = re.sub(r"\n{3,}", "\n\n", final_text).strip() + + if len(final_text) <= max_length: + return final_text + + return truncate_text_safely(text, max_length) + +def choose_final_visible_text( + full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True +): + text = clean_post_text(full_clean_text or "") + if not text: + return text + + if len(text) <= BSKY_TEXT_MAX_LENGTH: + logging.info( + "🟒 Original cleaned tweet text fits in Bluesky. Preserving exact text." + ) + return text + + if primary_non_x_url: + tail_start = find_tail_preservation_start(text, primary_non_x_url) + + if tail_start is not None: + preserved = truncate_text_preserving_tail( + text, tail_start, BSKY_TEXT_MAX_LENGTH + ) + if preserved and len(preserved) <= BSKY_TEXT_MAX_LENGTH: + logging.info( + "πŸ”— Preserving meaningful ending block with URL/hashtags in visible Bluesky text" + ) + return preserved + + if prefer_full_text_without_url and not looks_like_url_and_tag_tail( + text, primary_non_x_url + ): + text_without_url = remove_url_from_visible_text( + text, primary_non_x_url + ).strip() + if text_without_url and len(text_without_url) <= BSKY_TEXT_MAX_LENGTH: + logging.info( + "πŸ”— Keeping full visible text by removing long external URL from body and using external card" + ) + return text_without_url + + truncated = truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH) + logging.info("βœ‚οΈ Falling back to safe truncation for visible Bluesky text") + return truncated + + +def normalize_post_text(text): + if not text: + return "" + + text = clean_post_text(text) + text = text.replace("\r", "\n") + text = re.sub(r"\s+", " ", text).strip() + return text.lower() + + +def build_media_fingerprint(tweet): + if not tweet or not tweet.media: + return "no-media" + + parts = [] + + for media in tweet.media: + media_type = getattr(media, "type", "unknown") + media_url = getattr(media, "media_url_https", "") or "" + stable_value = media_url + + if media_type == "photo": + stable_value = re.sub(r"[?&]name=\w+", "", stable_value) + stable_value = re.sub(r"[?&]format=\w+", "", stable_value) + elif media_type == "video": + stable_value = canonicalize_tweet_url( + tweet.tweet_url or media_url or "" + ) + + parts.append(f"{media_type}:{stable_value}") + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def build_bsky_media_fingerprint(post_view): + try: + embed = getattr(post_view, "embed", None) + if not embed: + return "no-media" + + parts = [] + + images = getattr(embed, "images", None) + if images: + for img in images: + image_obj = getattr(img, "image", None) + ref = ( + getattr(image_obj, "ref", None) + or getattr(image_obj, "cid", None) + or str(image_obj) + ) + parts.append(f"photo:{ref}") + + video = getattr(embed, "video", None) + if video: + ref = ( + getattr(video, "ref", None) + or getattr(video, "cid", None) + or str(video) + ) + parts.append(f"video:{ref}") + + external = getattr(embed, "external", None) + if external: + uri = getattr(external, "uri", None) or str(external) + parts.append(f"external:{uri}") + + if not parts: + return "no-media" + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + except Exception as e: + logging.debug(f"Could not build Bluesky media fingerprint: {e}") + return "no-media" + + +def build_text_media_key(normalized_text, media_fingerprint): + return hashlib.sha256( + f"{normalized_text}||{media_fingerprint}".encode("utf-8") + ).hexdigest() + + +# --- Login hardening helpers (NEW) --- +def is_rate_limited_error(error_obj): + text = repr(error_obj).lower() + return ( + "429" in text + or "ratelimitexceeded" in text + or "too many requests" in text + or "rate limit" in text + ) + + +def is_auth_error(error_obj): + text = repr(error_obj).lower() + return ( + "401" in text + or "403" in text + or "invalid identifier or password" in text + or "authenticationrequired" in text + or "invalidtoken" in text + ) + + +def is_network_error(error_obj): + text = repr(error_obj) + signals = [ + "ConnectError", + "RemoteProtocolError", + "ReadTimeout", + "WriteTimeout", + "TimeoutException", + "503", + "502", + "504", + "ConnectionResetError", + ] + return any(sig in text for sig in signals) + + +def create_bsky_client(base_url, handle, password): + normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") + logging.info(f"πŸ” Connecting Bluesky client via base URL: {normalized_base_url}") + + client = Client(base_url=normalized_base_url) + + max_attempts = BSKY_LOGIN_MAX_RETRIES + base_delay = BSKY_LOGIN_BASE_DELAY + max_delay = BSKY_LOGIN_MAX_DELAY + jitter_max = max(BSKY_LOGIN_JITTER_MAX, 0.0) + + for attempt in range(1, max_attempts + 1): + try: + logging.info(f"πŸ” Bluesky login attempt {attempt}/{max_attempts} for {handle}") + client.login(handle, password) + logging.info("βœ… Bluesky login successful.") + return client + + except Exception as e: + logging.exception("❌ Bluesky login exception") + + # Fail fast on invalid credentials + if is_auth_error(e): + logging.error("❌ Bluesky auth failed (invalid handle/app password).") + raise + + # Respect explicit rate-limit timing + if is_rate_limited_error(e): + if attempt < max_attempts: + wait = get_rate_limit_wait_seconds(e, default_delay=base_delay) + wait = wait + random.uniform(0, jitter_max) + logging.warning( + f"⏳ Bluesky login rate-limited (attempt {attempt}/{max_attempts}). " + f"Retrying in {wait:.1f}s." + ) + time.sleep(wait) + continue + + logging.error("❌ Exhausted Bluesky login retries due to rate limiting.") + raise + + # Retry transient/network problems + if is_network_error(e) or is_transient_error(e): + if attempt < max_attempts: + wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter_max) + logging.warning( + f"⏳ Transient Bluesky login failure (attempt {attempt}/{max_attempts}). " + f"Retrying in {wait:.1f}s." + ) + time.sleep(wait) + continue + + logging.error("❌ Exhausted Bluesky login retries after transient/network errors.") + raise + + # Unknown errors: bounded retry anyway + if attempt < max_attempts: + wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter_max) + logging.warning( + f"⏳ Bluesky login retry for unexpected error " + f"(attempt {attempt}/{max_attempts}) in {wait:.1f}s." + ) + time.sleep(wait) + continue + + raise + + raise RuntimeError("Bluesky login failed after all retries.") + + +# --- State Management --- +def default_state(): + return { + "version": 1, + "posted_tweets": {}, + "posted_by_bsky_uri": {}, + "updated_at": None, + } + + +def load_state(state_path=STATE_PATH): + if not os.path.exists(state_path): + logging.info( + f"🧠 No state file found at {state_path}. Starting with empty memory." + ) + return default_state() + + try: + with open(state_path, "r", encoding="utf-8") as f: + state = json.load(f) + + if not isinstance(state, dict): + logging.warning("⚠️ State file is invalid. Reinitializing.") + return default_state() + + state.setdefault("version", 1) + state.setdefault("posted_tweets", {}) + state.setdefault("posted_by_bsky_uri", {}) + state.setdefault("updated_at", None) + + return state + + except Exception as e: + logging.warning( + f"⚠️ Could not load state file {state_path}: {e}. Reinitializing." + ) + return default_state() + + +def save_state(state, state_path=STATE_PATH): + try: + state["updated_at"] = arrow.utcnow().isoformat() + temp_path = f"{state_path}.tmp" + + with open(temp_path, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) + + os.replace(temp_path, state_path) + logging.info(f"πŸ’Ύ State saved to {state_path}") + + except Exception as e: + logging.error(f"❌ Failed to save state file {state_path}: {e}") + + +def remember_posted_tweet(state, candidate, bsky_uri=None): + canonical_tweet_url = candidate.get("canonical_tweet_url") + fallback_key = f"textmedia:{candidate['text_media_key']}" + state_key = canonical_tweet_url or fallback_key + + record = { + "canonical_tweet_url": canonical_tweet_url, + "normalized_text": candidate["normalized_text"], + "raw_text": candidate["raw_text"], + "full_clean_text": candidate.get("full_clean_text", candidate["raw_text"]), + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), + "ordered_non_x_urls": candidate.get("ordered_non_x_urls", []), + "resolved_primary_external_url": candidate.get("resolved_primary_external_url"), + "bsky_uri": bsky_uri, + "tweet_created_on": candidate["tweet"].created_on, + "tweet_url": candidate["tweet"].tweet_url, + "posted_at": arrow.utcnow().isoformat(), + } + + state["posted_tweets"][state_key] = record + + if bsky_uri: + state["posted_by_bsky_uri"][bsky_uri] = state_key + + +def candidate_matches_state(candidate, state): + canonical_tweet_url = candidate["canonical_tweet_url"] + text_media_key = candidate["text_media_key"] + normalized_text = candidate["normalized_text"] + + posted_tweets = state.get("posted_tweets", {}) + + if canonical_tweet_url and canonical_tweet_url in posted_tweets: + return True, "state:tweet_url" + + for _, record in posted_tweets.items(): + if record.get("text_media_key") == text_media_key: + return True, "state:text_media_fingerprint" + + for _, record in posted_tweets.items(): + if record.get("normalized_text") == normalized_text: + return True, "state:normalized_text" + + return False, None + + +def prune_state(state, max_entries=5000): + posted_tweets = state.get("posted_tweets", {}) + + if len(posted_tweets) <= max_entries: + return state + + sortable = [] + for key, record in posted_tweets.items(): + posted_at = record.get("posted_at") or "" + sortable.append((key, posted_at)) + + sortable.sort(key=lambda x: x[1], reverse=True) + keep_keys = {key for key, _ in sortable[:max_entries]} + + new_posted_tweets = { + key: record + for key, record in posted_tweets.items() + if key in keep_keys + } + new_posted_by_bsky_uri = { + bsky_uri: key + for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items() + if key in keep_keys + } + + state["posted_tweets"] = new_posted_tweets + state["posted_by_bsky_uri"] = new_posted_by_bsky_uri + return state + + +# --- Bluesky Feed Helpers --- +def extract_urls_from_facets(record): + urls = [] + + try: + facets = getattr(record, "facets", None) or [] + for facet in facets: + features = getattr(facet, "features", None) or [] + for feature in features: + uri = getattr(feature, "uri", None) + if uri: + urls.append(uri) + except Exception as e: + logging.debug(f"Could not extract facet URLs: {e}") + + return urls + + +def get_recent_bsky_posts(client, handle, limit=30): + recent_posts = [] + + try: + timeline = client.get_author_feed(handle, limit=limit) + + for item in timeline.feed: + try: + if item.reason is not None: + continue + + record = item.post.record + if getattr(record, "reply", None) is not None: + continue + + text = getattr(record, "text", "") or "" + normalized_text = normalize_post_text(text) + + urls = [] + urls.extend(extract_non_x_urls_from_text(text)) + urls.extend(extract_urls_from_facets(record)) + + canonical_non_x_urls = set() + for url in urls: + if not is_tco_domain(url) and not is_x_or_twitter_domain(url): + canonical = canonicalize_url( + normalize_urlish_token(url) or url + ) + if canonical: + canonical_non_x_urls.add(canonical) + + media_fingerprint = build_bsky_media_fingerprint(item.post) + text_media_key = build_text_media_key( + normalized_text, media_fingerprint + ) + + recent_posts.append( + { + "uri": getattr(item.post, "uri", None), + "text": text, + "normalized_text": normalized_text, + "canonical_non_x_urls": canonical_non_x_urls, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "created_at": getattr(record, "created_at", None), + } + ) + + except Exception as e: + logging.debug( + f"Skipping one Bluesky feed item during dedupe fetch: {e}" + ) + + except Exception as e: + logging.warning( + f"⚠️ Could not fetch recent Bluesky posts for duplicate detection " + f"(live dedup disabled for this cycle): {e}" + ) + + return recent_posts + + +# --- Upload / Retry Helpers --- +def get_rate_limit_wait_seconds(error_obj, default_delay): + """ + Parse common rate-limit headers and return a bounded wait time in seconds. + Supports: + - retry-after + - x-ratelimit-after + - ratelimit-reset (unix timestamp) + """ + try: + now_ts = int(time.time()) + + # Direct headers on exception + headers = getattr(error_obj, "headers", None) or {} + retry_after = headers.get("retry-after") or headers.get("Retry-After") + if retry_after: + return min(max(int(retry_after), 1), BSKY_LOGIN_MAX_DELAY) + + x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") + if x_after: + return min(max(int(x_after), 1), BSKY_LOGIN_MAX_DELAY) + + reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") + if reset_value: + wait_seconds = max(int(reset_value) - now_ts + 1, default_delay) + return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) + except Exception: + pass + + try: + # Nested response headers + response = getattr(error_obj, "response", None) + headers = getattr(response, "headers", None) or {} + now_ts = int(time.time()) + + retry_after = headers.get("retry-after") or headers.get("Retry-After") + if retry_after: + return min(max(int(retry_after), 1), BSKY_LOGIN_MAX_DELAY) + + x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") + if x_after: + return min(max(int(x_after), 1), BSKY_LOGIN_MAX_DELAY) + + reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") + if reset_value: + wait_seconds = max(int(reset_value) - now_ts + 1, default_delay) + return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) + except Exception: + pass + + # repr fallback parsing + text = repr(error_obj) + m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) + if m: + return min(max(int(m.group(1)), 1), BSKY_LOGIN_MAX_DELAY) + + m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) + if m: + return min(max(int(m.group(1)), 1), BSKY_LOGIN_MAX_DELAY) + + m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) + if m: + now_ts = int(time.time()) + wait_seconds = max(int(m.group(1)) - now_ts + 1, default_delay) + return min(wait_seconds, BSKY_LOGIN_MAX_DELAY) + + return default_delay + + +def is_transient_error(error_obj): + error_text = repr(error_obj) + transient_signals = [ + "InvokeTimeoutError", + "ReadTimeout", + "WriteTimeout", + "TimeoutException", + "RemoteProtocolError", + "ConnectError", + "503", + "502", + "504", + ] + return any(signal in error_text for signal in transient_signals) + + +def upload_blob_with_retry(client, binary_data, media_label="media"): + last_exception = None + transient_attempts = 0 + + for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): + try: + result = client.upload_blob(binary_data) + return result.blob + + except Exception as e: + last_exception = e + error_text = str(e) + is_rate_limited = ( + "429" in error_text or "RateLimitExceeded" in error_text + ) + + if is_rate_limited: + backoff_delay = min( + BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_BLOB_UPLOAD_MAX_DELAY, + ) + wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + + if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: + logging.warning( + f"⏳ Bluesky blob upload rate-limited for {media_label}. " + f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + else: + logging.warning( + f"❌ Exhausted blob upload retries for {media_label} " + f"after rate limiting: {repr(e)}" + ) + break + + if ( + is_transient_error(e) + and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES + ): + transient_attempts += 1 + wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts + logging.warning( + f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. " + f"Transient retry {transient_attempts}/" + f"{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + + logging.warning(f"Could not upload {media_label}: {repr(e)}") + + if hasattr(e, "response") and e.response is not None: + try: + logging.warning( + f"Upload response status: {e.response.status_code}" + ) + logging.warning(f"Upload response body: {e.response.text}") + except Exception: + pass + + return None + + logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") + return None + + +def send_post_with_retry(client, **kwargs): + last_exception = None + + for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): + try: + return client.send_post(**kwargs) + + except Exception as e: + last_exception = e + error_text = str(e) + is_rate_limited = ( + "429" in error_text or "RateLimitExceeded" in error_text + ) + + if is_rate_limited: + backoff_delay = min( + BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_SEND_POST_MAX_DELAY, + ) + wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + + if attempt < BSKY_SEND_POST_MAX_RETRIES: + logging.warning( + f"⏳ Bluesky send_post rate-limited. " + f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + else: + logging.error( + f"❌ Exhausted send_post retries after rate limiting: {repr(e)}" + ) + raise + + if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: + wait_seconds = BSKY_SEND_POST_BASE_DELAY * attempt + logging.warning( + f"⏳ Transient send_post failure: {repr(e)}. " + f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + + raise + + raise last_exception + +def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): + try: + with Image.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + + width, height = img.size + max_dim = max(width, height) + + if max_dim > BSKY_IMAGE_MAX_DIMENSION: + scale = BSKY_IMAGE_MAX_DIMENSION / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + img = img.resize(new_size, Image.LANCZOS) + logging.info( + f"πŸ–ΌοΈ Resized post image to {new_size[0]}x{new_size[1]}" + ) + + for quality in [90, 82, 75, 68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: + out = io.BytesIO() + img.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ Post image candidate size at JPEG quality {quality}: " + f"{len(data)} bytes ({len(data) / 1024:.2f} KB)" + ) + if len(data) <= max_bytes: + return data + + for target_dim in [1800, 1600, 1400, 1200, 1000]: + resized = img.copy() + width, height = resized.size + max_dim = max(width, height) + + if max_dim > target_dim: + scale = target_dim / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + resized = resized.resize(new_size, Image.LANCZOS) + + for quality in [68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: + out = io.BytesIO() + resized.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ Post image resized to <= {target_dim}px at quality {quality}: " + f"{len(data)} bytes ({len(data) / 1024:.2f} KB)" + ) + if len(data) <= max_bytes: + return data + + except Exception as e: + logging.warning(f"Could not compress post image: {repr(e)}") + + return None + + +def get_blob_from_url(media_url, client, http_client): + try: + r = http_client.get( + media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True + ) + if r.status_code != 200: + logging.warning( + f"Could not fetch media {media_url}: HTTP {r.status_code}" + ) + return None + + content = r.content + if not content: + logging.warning( + f"Could not fetch media {media_url}: empty response body" + ) + return None + + content_type = (r.headers.get("content-type") or "").lower() + upload_bytes = content + + if content_type.startswith("image/"): + original_size = len(content) + logging.info( + f"πŸ–ΌοΈ Downloaded post image {media_url} " + f"({original_size} bytes / {original_size / 1024:.2f} KB)" + ) + + if original_size > BSKY_IMAGE_MAX_BYTES: + logging.info( + f"πŸ–ΌοΈ Post image exceeds safe Bluesky limit " + f"({original_size} bytes > {BSKY_IMAGE_MAX_BYTES} bytes). Compressing..." + ) + compressed = compress_post_image_to_limit( + content, BSKY_IMAGE_MAX_BYTES + ) + if compressed: + upload_bytes = compressed + logging.info( + f"βœ… Post image compressed to {len(upload_bytes)} bytes " + f"({len(upload_bytes) / 1024:.2f} KB)" + ) + else: + logging.warning( + f"⚠️ Could not compress post image to safe limit: {media_url}" + ) + return None + + return upload_blob_with_retry(client, upload_bytes, media_label=media_url) + + except Exception as e: + logging.warning(f"Could not fetch media {media_url}: {repr(e)}") + return None + + +def get_blob_from_file(file_path, client): + try: + if not os.path.exists(file_path): + logging.warning( + f"Could not upload local file {file_path}: file does not exist" + ) + return None + + file_size = os.path.getsize(file_path) + file_size_mb = file_size / (1024 * 1024) + + logging.info( + f"πŸ“¦ Uploading local file {file_path} ({file_size_mb:.2f} MB)" + ) + + if ( + file_path.lower().endswith(".mp4") + and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB + ): + logging.warning( + f"Could not upload local file {file_path}: " + f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" + ) + return None + + with open(file_path, "rb") as f: + binary_data = f.read() + + return upload_blob_with_retry(client, binary_data, media_label=file_path) + + except Exception as e: + logging.warning(f"Could not upload local file {file_path}: {repr(e)}") + + if hasattr(e, "response") and e.response is not None: + try: + logging.warning( + f"Upload response status: {e.response.status_code}" + ) + logging.warning(f"Upload response body: {e.response.text}") + except Exception: + pass + + return None + + +def compress_external_thumb_to_limit( + image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES +): + try: + with Image.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + + width, height = img.size + max_dim = max(width, height) + + if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: + scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + img = img.resize(new_size, Image.LANCZOS) + logging.info( + f"πŸ–ΌοΈ Resized external thumb to {new_size[0]}x{new_size[1]}" + ) + + for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + out = io.BytesIO() + img.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ External thumb candidate size at JPEG quality {quality}: " + f"{len(data) / 1024:.2f} KB" + ) + if len(data) <= max_bytes: + return data + + for target_dim in [1000, 900, 800, 700, 600]: + resized = img.copy() + width, height = resized.size + max_dim = max(width, height) + + if max_dim > target_dim: + scale = target_dim / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + resized = resized.resize(new_size, Image.LANCZOS) + + for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + out = io.BytesIO() + resized.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ External thumb resized to <= {target_dim}px at quality {quality}: " + f"{len(data) / 1024:.2f} KB" + ) + if len(data) <= max_bytes: + return data + + except Exception as e: + logging.warning(f"Could not compress external thumbnail: {repr(e)}") + + return None + + +def get_external_thumb_blob_from_url(image_url, client, http_client): + try: + r = http_client.get( + image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True + ) + if r.status_code != 200: + logging.warning( + f"Could not fetch external thumb {image_url}: HTTP {r.status_code}" + ) + return None + + content = r.content + if not content: + logging.warning( + f"Could not fetch external thumb {image_url}: empty body" + ) + return None + + original_size_kb = len(content) / 1024 + logging.info( + f"πŸ–ΌοΈ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)" + ) + + upload_bytes = content + if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES: + logging.info( + f"πŸ–ΌοΈ External thumb exceeds safe limit " + f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..." + ) + compressed = compress_external_thumb_to_limit( + upload_bytes, EXTERNAL_THUMB_MAX_BYTES + ) + if compressed: + upload_bytes = compressed + logging.info( + f"βœ… External thumb compressed to {len(upload_bytes) / 1024:.2f} KB" + ) + else: + logging.warning( + "⚠️ Could not compress external thumb to fit limit. Will omit thumbnail." + ) + return None + else: + logging.info("βœ… External thumb already within safe size limit.") + + blob = upload_blob_with_retry( + client, + upload_bytes, + media_label=f"external-thumb:{image_url}", + ) + if blob: + return blob + + logging.warning("⚠️ External thumb upload failed. Will omit thumbnail.") + return None + + except Exception as e: + logging.warning( + f"Could not fetch/upload external thumb {image_url}: {repr(e)}" + ) + return None + + +def fetch_link_metadata(url, http_client): + try: + r = http_client.get( + url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True + ) + r.raise_for_status() + soup = BeautifulSoup(r.text, "html.parser") + + title = soup.find("meta", property="og:title") or soup.find("title") + desc = ( + soup.find("meta", property="og:description") + or soup.find("meta", attrs={"name": "description"}) + ) + image = ( + soup.find("meta", property="og:image") + or soup.find("meta", attrs={"name": "twitter:image"}) + ) + + return { + "title": ( + title["content"] + if title and title.has_attr("content") + else (title.text.strip() if title and title.text else "") + ), + "description": ( + desc["content"] if desc and desc.has_attr("content") else "" + ), + "image": ( + image["content"] if image and image.has_attr("content") else None + ), + } + + except Exception as e: + logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}") + return {} + + +def build_external_link_embed( + url, client, http_client, fallback_title="Link", prefetched_metadata=None, +): + link_metadata = ( + prefetched_metadata + if prefetched_metadata is not None + else fetch_link_metadata(url, http_client) + ) + + thumb_blob = None + if link_metadata.get("image"): + thumb_blob = get_external_thumb_blob_from_url( + link_metadata["image"], client, http_client + ) + if thumb_blob: + logging.info("βœ… External link card thumbnail prepared successfully") + else: + logging.info("ℹ️ External link card will be posted without thumbnail") + + if ( + link_metadata.get("title") + or link_metadata.get("description") + or thumb_blob + ): + return models.AppBskyEmbedExternal.Main( + external=models.AppBskyEmbedExternal.External( + uri=url, + title=link_metadata.get("title") or fallback_title, + description=link_metadata.get("description") or "", + thumb=thumb_blob, + ) + ) + + return None + + +def make_rich(content): + # NOTE: Bluesky supports native @mention facets, but resolving a Twitter + # handle to a Bluesky DID requires an external lookup. That mapping is not + # available here so @mentions are passed through as plain text. If you add + # a handle-mapping table in the future, call + # text_builder.mention(word, did) here instead of text_builder.text(word). + text_builder = client_utils.TextBuilder() + content = clean_post_text(content) + lines = content.splitlines() + + for line_idx, line in enumerate(lines): + if not line.strip(): + if line_idx < len(lines) - 1: + text_builder.text("\n") + continue + + words = line.split(" ") + for i, word in enumerate(words): + if not word: + if i < len(words) - 1: + text_builder.text(" ") + continue + + cleaned_word = strip_trailing_url_punctuation(word) + normalized_candidate = normalize_urlish_token(cleaned_word) + + if normalized_candidate: + if is_x_or_twitter_domain(normalized_candidate): + text_builder.text(word) + else: + clean_url_value = clean_url(normalized_candidate) + + if clean_url_value and is_valid_url(clean_url_value): + text_builder.link(cleaned_word, clean_url_value) + trailing = word[len(cleaned_word):] + if trailing: + text_builder.text(trailing) + else: + text_builder.text(word) + + elif cleaned_word.startswith("#") and len(cleaned_word) > 1: + clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"") + if clean_tag: + text_builder.tag(cleaned_word, clean_tag) + trailing = word[len(cleaned_word):] + if trailing: + text_builder.text(trailing) + else: + text_builder.text(word) + + else: + text_builder.text(word) + + if i < len(words) - 1: + text_builder.text(" ") + + if line_idx < len(lines) - 1: + text_builder.text("\n") + + return text_builder + + +def build_dynamic_alt(raw_text, link_title=None): + dynamic_alt = clean_post_text(raw_text) + dynamic_alt = dynamic_alt.replace("\n", " ").strip() + dynamic_alt = re.sub( + r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt + ).strip() + + if not dynamic_alt and link_title: + dynamic_alt = link_title.strip() + + if len(dynamic_alt) > DYNAMIC_ALT_MAX_LENGTH: + dynamic_alt = dynamic_alt[:DYNAMIC_ALT_MAX_LENGTH] + elif not dynamic_alt: + dynamic_alt = "Attached video or image from tweet" + + return dynamic_alt + + +def build_video_embed(video_blob, alt_text): + try: + return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) + except AttributeError: + logging.error( + "❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto." + ) + return None + + +# --- Twitter Scraping --- +def scrape_tweets_via_playwright(username, password, email, target_handle, locale="en-US"): + tweets = [] + state_file = "twitter_browser_state.json" + + if os.path.exists(state_file): + try: + os.chmod(state_file, SESSION_FILE_PERMISSIONS) + except Exception as e: + logging.warning( + f"⚠️ Could not set permissions on {state_file}: {e}" + ) + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + clean_ua = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ) + + browser_context = None + needs_login = True + session_check_page = None + + if os.path.exists(state_file): + logging.info( + "βœ… Found existing browser state. Attempting to bypass login..." + ) + browser_context = browser.new_context( + user_agent=clean_ua, + viewport={"width": 1920, "height": 1080}, + storage_state=state_file, + locale=locale, + ) + session_check_page = browser_context.new_page() + session_check_page.goto("https://x.com/home") + time.sleep(3) + + if ( + session_check_page.locator( + '[data-testid="SideNav_NewTweet_Button"]' + ).is_visible() + or "/home" in session_check_page.url + ): + logging.info("βœ… Session is valid!") + needs_login = False + else: + logging.warning( + "⚠️ Saved session expired or invalid. Re-logging in..." + ) + session_check_page.close() + session_check_page = None + browser_context.close() + browser_context = None + os.remove(state_file) + + if session_check_page is not None: + session_check_page.close() + session_check_page = None + + if needs_login: + logging.info( + "πŸš€ Launching fresh browser for automated Twitter login..." + ) + browser_context = browser.new_context( + user_agent=clean_ua, + viewport={"width": 1920, "height": 1080}, + locale=locale, # βœ… add this + ) + login_page = browser_context.new_page() + + try: + login_page.goto("https://x.com") + sign_in_button = login_page.get_by_text("Sign in", exact=True) + sign_in_button.wait_for(state="visible", timeout=15000) + sign_in_button.click(force=True) + + login_page.wait_for_selector( + 'h1:has-text("Sign in to X")', + state="visible", + timeout=25000, + ) + logging.info(f"πŸ‘€ Entering username: {username}...") + time.sleep(1) + + username_input = login_page.locator( + 'input[autocomplete="username"]' + ).first + username_input.wait_for(state="visible", timeout=15000) + username_input.click(force=True) + username_input.press_sequentially(username, delay=100) + + login_page.locator('button:has-text("Next")').first.click( + force=True + ) + login_page.wait_for_selector( + 'input[name="password"], ' + 'input[data-testid="ocfEnterTextTextInput"], ' + 'input[name="text"]', + timeout=15000, + ) + time.sleep(1) + + if login_page.locator( + 'input[data-testid="ocfEnterTextTextInput"]' + ).is_visible() or login_page.locator( + 'input[name="text"]' + ).is_visible(): + logging.warning( + "πŸ›‘οΈ Security challenge detected! Entering email/phone..." + ) + login_page.fill( + 'input[data-testid="ocfEnterTextTextInput"], ' + 'input[name="text"]', + email, + ) + sec_next = login_page.locator( + '[data-testid="ocfEnterTextNextButton"], ' + 'span:has-text("Next")' + ).first + if sec_next.is_visible(): + sec_next.click(force=True) + else: + login_page.keyboard.press("Enter") + login_page.wait_for_selector( + 'input[name="password"]', timeout=15000 + ) + time.sleep(1) + + logging.info("πŸ”‘ Entering password...") + login_page.fill('input[name="password"]', password) + login_page.locator('span:has-text("Log in")').first.click() + + login_page.wait_for_url("**/home", timeout=40000) + time.sleep(3) + + browser_context.storage_state(path=state_file) + try: + os.chmod(state_file, SESSION_FILE_PERMISSIONS) + except Exception as chmod_err: + logging.warning( + f"⚠️ Could not set permissions on {state_file} " + f"after save: {chmod_err}" + ) + logging.info("βœ… Login successful. Browser state saved.") + + except Exception as e: + take_error_screenshot(login_page, "login_failed") + logging.error(f"❌ Login failed: {e}") + login_page.close() + browser.close() + return [] + + login_page.close() + + logging.info( + f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets..." + ) + scrape_page = browser_context.new_page() + scrape_page.goto(f"https://x.com/{target_handle}") + + try: + scrape_page.wait_for_selector("article", timeout=40000) + time.sleep(2) + + articles = scrape_page.locator("article").all() + logging.info( + f"πŸ“Š Found {len(articles)} tweets on screen. " + f"Parsing up to {SCRAPE_TWEET_LIMIT}..." + ) + + for article in articles[:SCRAPE_TWEET_LIMIT]: + try: + time_el = article.locator("time").first + if not time_el.is_visible(): + continue + + created_at = time_el.get_attribute("datetime") + + tweet_url = None + time_link = article.locator("a:has(time)").first + if time_link.is_visible(): + href = time_link.get_attribute("href") + if href: + tweet_url = ( + f"https://x.com{href}" + if href.startswith("/") + else href + ) + + is_retweet = False + try: + social_context_el = article.locator( + '[data-testid="socialContext"]' + ).first + if social_context_el.is_visible(): + context_text = social_context_el.inner_text().lower() + repost_keywords = [ + "reposted", + "retweeted", + "ha repostejat", + "ha retuitat", + "repostejat", + "ha reposteado", + "retuiteΓ³", + ] + if any(kw in context_text for kw in repost_keywords): + is_retweet = True + logging.info( + f"πŸ” Detected retweet/repost: {tweet_url}" + ) + except Exception: + pass + + text_locator = article.locator( + '[data-testid="tweetText"]' + ).first + text = ( + text_locator.inner_text() + if text_locator.is_visible() + else "" + ) + + media_urls = [] + + photo_locators = article.locator( + '[data-testid="tweetPhoto"] img' + ).all() + for img in photo_locators: + src = img.get_attribute("src") + if src: + src = re.sub(r"&name=\w+", "&name=large", src) + media_urls.append((src, "photo")) + + video_locators = article.locator( + '[data-testid="videoPlayer"]' + ).all() + if video_locators: + media_urls.append((tweet_url or "", "video")) + + card_url = None + try: + card_locator = article.locator( + '[data-testid="card.wrapper"] a[href]' + ).first + if card_locator.is_visible(): + card_href = card_locator.get_attribute("href") + if card_href: + card_url = card_href.strip() + logging.info( + f"πŸƒ Scraped card URL from tweet: {card_url}" + ) + except Exception: + pass + + if not card_url: + try: + card_role_link = article.locator( + '[data-testid="card.wrapper"] [role="link"]' + ).first + if card_role_link.is_visible(): + card_a = card_role_link.locator("a[href]").first + if card_a.is_visible(): + card_href = card_a.get_attribute("href") + if card_href: + card_url = card_href.strip() + logging.info( + f"πŸƒ Scraped card URL (fallback) from tweet: {card_url}" + ) + except Exception: + pass + + tweets.append( + ScrapedTweet( + created_at, + text, + media_urls, + tweet_url=tweet_url, + card_url=card_url, + is_retweet=is_retweet, + ) + ) + + except Exception as e: + logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") + continue + + except Exception as e: + take_error_screenshot(scrape_page, "scrape_failed") + logging.error(f"❌ Failed to scrape profile: {e}") + + browser.close() + return tweets + + +# --- Video Extraction & Processing --- +def extract_video_url_from_tweet_page(browser_context, tweet_url): + page = browser_context.new_page() + best_m3u8_url = None + best_video_mp4_url = None + seen_urls = set() # ← scoped per call, so already reset per tweet βœ… + + def is_audio_only_mp4(url, content_type): + url_l = url.lower() + content_type_l = content_type.lower() + return ( + "/aud/" in url_l + or "/audio/" in url_l + or "mp4a" in url_l + or ("audio/" in content_type_l and "video/" not in content_type_l) + ) + + def handle_response(response): + nonlocal best_m3u8_url, best_video_mp4_url + try: + url = response.url + if url in seen_urls: + return + seen_urls.add(url) + + url_l = url.lower() + content_type = response.headers.get("content-type", "") + content_type_l = content_type.lower() + + if ".m4s" in url_l: + return + + if ( + ".m3u8" in url_l + or "application/vnd.apple.mpegurl" in content_type_l + or "application/x-mpegurl" in content_type_l + ): + if best_m3u8_url is None: + best_m3u8_url = url + logging.info(f"πŸ“Ί Found HLS playlist URL: {url}") + return + + if ( + ".mp4" in url_l + or "video/mp4" in content_type_l + or "audio/mp4" in content_type_l + ): + if is_audio_only_mp4(url, content_type): + logging.info(f"πŸ”‡ Ignoring audio-only MP4: {url}") + return + + if best_video_mp4_url is None: + best_video_mp4_url = url + logging.info(f"πŸŽ₯ Found VIDEO MP4 URL: {url}") + return + + except Exception as e: + logging.debug(f"Response parsing error: {e}") + + page.on("response", handle_response) + + def current_best(): + return best_m3u8_url or best_video_mp4_url + + try: + logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") + page.goto(tweet_url, wait_until="domcontentloaded", timeout=40000) + time.sleep(2) + + player = page.locator('[data-testid="videoPlayer"]').first + + if player.count() > 0: + try: + player.scroll_into_view_if_needed(timeout=5000) + except Exception: + pass + + try: + player.click(force=True, timeout=5000) + logging.info("▢️ Clicked video player") + except Exception as e: + logging.info(f"⚠️ First player click failed: {e}") + else: + logging.warning("⚠️ No video player locator found on tweet page") + + for _ in range(VIDEO_PLAYER_WAIT_ROUNDS): + if current_best(): + break + time.sleep(1) + + if not current_best() and player.count() > 0: + logging.info("πŸ” No media URL found yet, retrying player interaction...") + try: + player.click(force=True, timeout=5000) + time.sleep(PLAYWRIGHT_RETRY_SLEEP_S) + except Exception as e: + logging.info(f"⚠️ Retry click failed: {e}") + + try: + page.keyboard.press("Space") + time.sleep(1) + except Exception: + pass + + for _ in range(VIDEO_PLAYER_RETRY_ROUNDS): + if current_best(): + break + time.sleep(1) + + selected_url = current_best() + if selected_url: + logging.info(f"βœ… Selected media URL for download: {selected_url}") + else: + logging.warning( + f"⚠️ No playable media URL detected on tweet page: {tweet_url}" + ) + + return selected_url + + except Exception as e: + logging.warning( + f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}" + ) + return None + finally: + page.remove_listener("response", handle_response) # ← FIX 1: detach before close + page.close() + +def _probe_video_duration(file_path): + probe_cmd = [ + "ffprobe", + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + file_path, + ] + try: + result = subprocess.run( + probe_cmd, + capture_output=True, + text=True, + timeout=FFPROBE_TIMEOUT_SECONDS, + ) + if result.returncode != 0: + raise RuntimeError( + f"ffprobe exited with code {result.returncode}: " + f"{result.stderr.strip()}" + ) + duration_str = result.stdout.strip() + if not duration_str: + raise RuntimeError("ffprobe returned empty duration output") + return float(duration_str) + except subprocess.TimeoutExpired: + raise RuntimeError( + f"ffprobe timed out after {FFPROBE_TIMEOUT_SECONDS}s on {file_path}" + ) + + +def download_and_crop_video(video_url, output_path): + temp_input = output_path.replace(".mp4", "_source.mp4") + temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") + temp_output = output_path.replace(".mp4", "_compressed.mp4") + + try: + logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}") + + video_url_l = video_url.lower() + + if ".m3u8" in video_url_l: + logging.info("πŸ“Ί Using HLS ffmpeg mode") + download_cmd = [ + "ffmpeg", "-y", + "-protocol_whitelist", "file,http,https,tcp,tls,crypto", + "-allowed_extensions", "ALL", + "-i", video_url, + "-c", "copy", + temp_input, + ] + else: + logging.info("πŸŽ₯ Using direct MP4 ffmpeg mode") + download_cmd = [ + "ffmpeg", "-y", + "-i", video_url, + "-c", "copy", + temp_input, + ] + + download_result = subprocess.run( + download_cmd, + capture_output=True, + text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, + ) + + if download_result.returncode != 0: + logging.error( + f"❌ ffmpeg download failed:\n{download_result.stderr}" + ) + return None + + if ( + not os.path.exists(temp_input) + or os.path.getsize(temp_input) == 0 + ): + logging.error("❌ Downloaded video source file is missing or empty.") + return None + + logging.info(f"βœ… Video downloaded: {temp_input}") + + try: + duration = _probe_video_duration(temp_input) + except RuntimeError as probe_err: + logging.error(f"❌ Could not probe video duration: {probe_err}") + return None + + if duration <= 0: + logging.error("❌ Downloaded video has invalid or unknown duration.") + return None + + end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) + + video_clip = VideoFileClip(temp_input) + try: + if hasattr(video_clip, "subclipped"): + cropped_clip = video_clip.subclipped(0, end_time) + else: + cropped_clip = video_clip.subclip(0, end_time) + + try: + cropped_clip.write_videofile( + temp_trimmed, + codec="libx264", + audio_codec="aac", + preset="veryfast", + bitrate="1800k", + audio_bitrate="128k", + logger=None, + ) + finally: + cropped_clip.close() + finally: + video_clip.close() + + if ( + not os.path.exists(temp_trimmed) + or os.path.getsize(temp_trimmed) == 0 + ): + logging.error("❌ Trimmed video output is missing or empty.") + return None + + trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024) + logging.info( + f"πŸ“¦ Trimmed video size before compression: {trimmed_size_mb:.2f} MB" + ) + + compress_cmd = [ + "ffmpeg", "-y", + "-i", temp_trimmed, + "-vf", "scale='min(720,iw)':-2", + "-c:v", "libx264", + "-preset", "veryfast", + "-crf", "30", + "-maxrate", "1800k", + "-bufsize", "3600k", + "-c:a", "aac", + "-b:a", "128k", + "-movflags", "+faststart", + temp_output, + ] + + compress_result = subprocess.run( + compress_cmd, + capture_output=True, + text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, + ) + + if compress_result.returncode != 0: + logging.error( + f"❌ ffmpeg compression failed:\n{compress_result.stderr}" + ) + return None + + if ( + not os.path.exists(temp_output) + or os.path.getsize(temp_output) == 0 + ): + logging.error("❌ Compressed video output is missing or empty.") + return None + + final_size_mb = os.path.getsize(temp_output) / (1024 * 1024) + logging.info( + f"βœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)" + ) + + os.replace(temp_output, output_path) + logging.info(f"βœ… Final video ready: {output_path}") + return output_path + + except subprocess.TimeoutExpired: + logging.error( + f"❌ ffmpeg subprocess timed out after {SUBPROCESS_TIMEOUT_SECONDS}s" + ) + return None + + except Exception as e: + logging.error(f"❌ Error processing video: {repr(e)}") + return None + + finally: + remove_file_quietly(temp_input) + remove_file_quietly(temp_trimmed) + remove_file_quietly(temp_output) + + +# --- Deduplication --- +def candidate_matches_existing_bsky(candidate, recent_bsky_posts): + candidate_non_x_urls = candidate["canonical_non_x_urls"] + candidate_text_media_key = candidate["text_media_key"] + candidate_normalized_text = candidate["normalized_text"] + + for existing in recent_bsky_posts: + existing_non_x_urls = existing["canonical_non_x_urls"] + + if ( + candidate_non_x_urls + and candidate_non_x_urls == existing_non_x_urls + and candidate_normalized_text == existing["normalized_text"] + ): + return True, "bsky:normalized_text_plus_non_x_urls" + + if candidate_text_media_key == existing["text_media_key"]: + return True, "bsky:text_media_fingerprint" + + if candidate_normalized_text == existing["normalized_text"]: + return True, "bsky:normalized_text" + + return False, None + + +# --- Main Sync Logic --- +def sync_feeds(args): + logging.info("πŸ”„ Starting sync cycle...") + dry_run = getattr(args, "dry_run", False) + bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS + bot_locale = bsky_langs_to_playwright_locale(bsky_langs) # βœ… now defined + _cache.locale = bot_locale + + if dry_run: + logging.info("πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky.") + + try: + state = load_state(STATE_PATH) + state = prune_state(state, max_entries=5000) + + tweets = scrape_tweets_via_playwright( + args.twitter_username, + args.twitter_password, + args.twitter_email, + args.twitter_handle, + locale=bot_locale, + ) + + if not tweets: + logging.warning( + "⚠️ No tweets found or failed to fetch. " + "Skipping Bluesky sync for this cycle." + ) + return + + bsky_client = None + if not dry_run: + bsky_client = create_bsky_client( + args.bsky_base_url, + args.bsky_handle, + args.bsky_password, + ) + + recent_bsky_posts = [] + if not dry_run: + recent_bsky_posts = get_recent_bsky_posts( + bsky_client, + args.bsky_handle, + limit=DEDUPE_BSKY_LIMIT, + ) + + logging.info( + f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts " + f"for duplicate detection." + ) + logging.info( + f"🧠 Local state currently tracks " + f"{len(state.get('posted_tweets', {}))} posted items." + ) + + too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) + logging.info(f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}") + + candidate_tweets = [] + cheap_candidates = [] + + for tweet in reversed(tweets): + try: + tweet_time = arrow.get(tweet.created_on) + + if tweet_time < too_old_cutoff: + logging.info(f"⏭️ Skipping old tweet from {tweet_time}") + continue + + if tweet.is_retweet: + logging.info( + f"⏭️ Skipping retweet/repost: {tweet.tweet_url}" + ) + continue + + canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url) + if canonical_tweet_url and canonical_tweet_url in state.get( + "posted_tweets", {} + ): + logging.info( + f"⚑ Early skip due to known tweet URL in local state: " + f"{canonical_tweet_url}" + ) + continue + + scraped_text = clean_post_text(tweet.text or "") + if not scraped_text and not tweet.media: + logging.info( + f"⏭️ Skipping empty/blank tweet from {tweet_time}" + ) + continue + + cheap_candidates.append( + (tweet, tweet_time, canonical_tweet_url) + ) + + except Exception as e: + logging.warning(f"⚠️ Failed during cheap prefilter: {e}") + + logging.info( + f"⚑ {len(cheap_candidates)} tweets remain after cheap prefilter." + ) + + with httpx.Client() as resolve_http_client: + for tweet, tweet_time, canonical_tweet_url in cheap_candidates: + try: + ( + full_clean_text, + resolved_primary_external_url, + ) = build_effective_tweet_text(tweet, resolve_http_client) + + normalized_text = normalize_post_text(full_clean_text) + + if not normalized_text and not tweet.media: + logging.info( + f"⏭️ Skipping empty/blank tweet after enrichment " + f"from {tweet_time}" + ) + continue + + ordered_non_x_urls = extract_ordered_non_x_urls( + full_clean_text + ) + + canonical_non_x_urls = set() + if resolved_primary_external_url: + canonical_non_x_urls.add( + canonicalize_url(resolved_primary_external_url) + ) + + for raw_url in ordered_non_x_urls: + if not is_tco_domain( + raw_url + ) and not is_x_or_twitter_domain(raw_url): + canonical_non_x_urls.add(canonicalize_url(raw_url)) + + primary_non_x_url = None + if resolved_primary_external_url: + primary_non_x_url = resolved_primary_external_url + else: + primary_non_x_url = extract_first_visible_non_x_url( + full_clean_text + ) + if not primary_non_x_url and ordered_non_x_urls: + primary_non_x_url = ordered_non_x_urls[0] + + has_video = any( + getattr(m, "type", None) == "video" + for m in (tweet.media or []) + ) + has_photo = any( + getattr(m, "type", None) == "photo" + for m in (tweet.media or []) + ) + + raw_text = choose_final_visible_text( + full_clean_text, + primary_non_x_url=primary_non_x_url, + prefer_full_text_without_url=False, + ) + + media_fingerprint = build_media_fingerprint(tweet) + text_media_key = build_text_media_key( + normalized_text, media_fingerprint + ) + + candidate = { + "tweet": tweet, + "tweet_time": tweet_time, + "raw_text": raw_text, + "full_clean_text": full_clean_text, + "normalized_text": normalized_text, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "canonical_tweet_url": canonical_tweet_url, + "canonical_non_x_urls": canonical_non_x_urls, + "ordered_non_x_urls": ordered_non_x_urls, + "primary_non_x_url": primary_non_x_url, + "resolved_primary_external_url": resolved_primary_external_url, + "looks_like_title_plus_url": looks_like_title_plus_url_post( + full_clean_text + ), + "has_video": has_video, + "has_photo": has_photo, + } + + is_dup_state, reason_state = candidate_matches_state( + candidate, state + ) + if is_dup_state: + logging.info( + f"⏭️ Skipping candidate due to local state duplicate " + f"match on: {reason_state}" + ) + continue + + is_dup_bsky, reason_bsky = candidate_matches_existing_bsky( + candidate, recent_bsky_posts + ) + if is_dup_bsky: + logging.info( + f"⏭️ Skipping candidate due to recent Bluesky duplicate " + f"match on: {reason_bsky}" + ) + continue + + candidate_tweets.append(candidate) + + except Exception as e: + logging.warning( + f"⚠️ Failed to prepare candidate tweet: {e}" + ) + + logging.info( + f"πŸ“¬ {len(candidate_tweets)} tweets remain after duplicate filtering." + ) + + if not candidate_tweets: + logging.info( + "βœ… No new tweets need posting after duplicate comparison." + ) + return + + new_posts = 0 + browser_state_file = "twitter_browser_state.json" + + with sync_playwright() as p, httpx.Client() as media_http_client: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + context_kwargs = { + "user_agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + "viewport": {"width": 1920, "height": 1080}, + "locale": bot_locale, + } + if os.path.exists(browser_state_file): + context_kwargs["storage_state"] = browser_state_file + + browser_context = browser.new_context(**context_kwargs) + + for candidate in candidate_tweets: + tweet = candidate["tweet"] + tweet_time = candidate["tweet_time"] + raw_text = candidate["raw_text"] + full_clean_text = candidate["full_clean_text"] + + logging.info( + f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} " + f"missing tweet from {tweet_time} to Bluesky..." + ) + + if dry_run: + logging.info( + f" πŸ“„ Text: {raw_text[:200]}" + ) + logging.info( + f" πŸ”— Primary external URL: " + f"{candidate.get('resolved_primary_external_url', 'None')}" + ) + logging.info( + f" πŸƒ Card URL: {getattr(tweet, 'card_url', 'None')}" + ) + logging.info( + f" 🎬 Has video: {candidate.get('has_video', False)}" + ) + logging.info( + f" πŸ–ΌοΈ Has photo: {candidate.get('has_photo', False)}" + ) + logging.info( + f" πŸ” Is retweet: {getattr(tweet, 'is_retweet', False)}" + ) + + remember_posted_tweet( + state, + candidate, + bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}", + ) + save_state(state, STATE_PATH) + new_posts += 1 + continue + + link_meta_for_alt: dict = {} + if candidate.get("resolved_primary_external_url"): + try: + link_meta_for_alt = fetch_link_metadata( + candidate["resolved_primary_external_url"], + media_http_client, + ) + except Exception: + pass + + rich_text = make_rich(raw_text) + dynamic_alt = build_dynamic_alt( + full_clean_text, + link_title=link_meta_for_alt.get("title"), + ) + + image_embeds = [] + video_embed = None + external_embed = None + media_upload_failures = [] + + has_video = candidate.get("has_video", False) + + if has_video: + video_media = next( + ( + m + for m in (tweet.media or []) + if getattr(m, "type", None) == "video" + ), + None, + ) + + if video_media: + if not tweet.tweet_url: + logging.warning( + "⚠️ Tweet has video marker but no tweet URL. " + "Skipping video." + ) + media_upload_failures.append("video:no_tweet_url") + else: + temp_video_base = make_unique_video_temp_base( + tweet.tweet_url + ) + temp_video_path = f"{temp_video_base}.mp4" + + try: + real_video_url = ( + extract_video_url_from_tweet_page( + browser_context, tweet.tweet_url + ) + ) + if not real_video_url: + logging.warning( + f"⚠️ Could not resolve playable video URL " + f"for {tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:resolve_failed:{tweet.tweet_url}" + ) + else: + cropped_video_path = download_and_crop_video( + real_video_url, temp_video_path + ) + if not cropped_video_path: + logging.warning( + f"⚠️ Video download/crop failed for " + f"{tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:crop_failed:{tweet.tweet_url}" + ) + else: + video_blob = get_blob_from_file( + cropped_video_path, bsky_client + ) + if not video_blob: + logging.warning( + f"⚠️ Video upload blob failed for " + f"{tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:upload_failed:{tweet.tweet_url}" + ) + else: + video_embed = build_video_embed( + video_blob, dynamic_alt + ) + if not video_embed: + media_upload_failures.append( + f"video:embed_failed:{tweet.tweet_url}" + ) + finally: + remove_file_quietly(temp_video_path) + remove_file_quietly( + f"{temp_video_base}_source.mp4" + ) + remove_file_quietly( + f"{temp_video_base}_trimmed.mp4" + ) + remove_file_quietly( + f"{temp_video_base}_compressed.mp4" + ) + + if not video_embed: + logging.warning( + "⚠️ Tweet contains video, but video could not be " + "posted. Skipping photo fallback for this tweet." + ) + + else: + if tweet.media: + for media in tweet.media: + if media.type == "photo": + blob = get_blob_from_url( + media.media_url_https, + bsky_client, + media_http_client, + ) + if blob: + image_embeds.append( + models.AppBskyEmbedImages.Image( + alt=dynamic_alt, + image=blob, + ) + ) + else: + media_upload_failures.append( + f"photo:{media.media_url_https}" + ) + + # --- External link card logic --- + if not video_embed and not image_embeds: + candidate_url = candidate.get( + "resolved_primary_external_url" + ) + + if candidate_url: + if candidate.get("looks_like_title_plus_url"): + logging.info( + f"πŸ”— Detected title+URL post style. " + f"Using resolved URL for external card: " + f"{candidate_url}" + ) + else: + logging.info( + f"πŸ”— Using resolved first external URL for " + f"external card: {candidate_url}" + ) + + external_embed = build_external_link_embed( + candidate_url, + bsky_client, + media_http_client, + fallback_title="Link", + prefetched_metadata=link_meta_for_alt or None, + ) + + if external_embed: + logging.info( + f"βœ… Built external link card for URL: " + f"{candidate_url}" + ) + else: + logging.info( + f"ℹ️ Could not build external link card metadata " + f"for URL: {candidate_url}" + ) + + try: + post_result = None + post_mode = "text" + + if video_embed: + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + embed=video_embed, + langs=bsky_langs, + ) + post_mode = "video" + elif image_embeds: + embed = models.AppBskyEmbedImages.Main( + images=image_embeds + ) + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + embed=embed, + langs=bsky_langs, + ) + post_mode = f"images:{len(image_embeds)}" + elif external_embed: + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + embed=external_embed, + langs=bsky_langs, + ) + post_mode = "external_link_card" + else: + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + langs=bsky_langs, + ) + post_mode = "text_only" + + bsky_uri = getattr(post_result, "uri", None) + + remember_posted_tweet( + state, candidate, bsky_uri=bsky_uri + ) + state = prune_state(state, max_entries=5000) + save_state(state, STATE_PATH) + + recent_bsky_posts.insert( + 0, + { + "uri": bsky_uri, + "text": raw_text, + "normalized_text": candidate["normalized_text"], + "canonical_non_x_urls": candidate[ + "canonical_non_x_urls" + ], + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "created_at": arrow.utcnow().isoformat(), + }, + ) + recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] + + new_posts += 1 + + if media_upload_failures: + logging.warning( + f"βœ… Posted tweet to Bluesky with degraded media " + f"mode ({post_mode}). " + f"Failed media items: {media_upload_failures}" + ) + else: + logging.info( + f"βœ… Posted new tweet to Bluesky with mode " + f"{post_mode}: {raw_text}" + ) + + time.sleep(5) + + except Exception as e: + logging.error( + f"❌ Failed to post tweet to Bluesky: {e}" + ) + + browser.close() + + logging.info( + f"βœ… Sync complete. Posted {new_posts} new updates." + ) + + except Exception as e: + logging.error(f"❌ Error during sync cycle: {e}") + +def main(): + load_dotenv() + + parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") + parser.add_argument( + "--twitter-username", + help="Your Twitter login username", + ) + parser.add_argument( + "--twitter-password", + # NOTE (FIX #15): passwords passed via CLI are visible in `ps aux`. + # Prefer setting TWITTER_PASSWORD in your .env file instead. + help="Your Twitter login password", + ) + parser.add_argument( + "--twitter-email", + help="Your Twitter email for security challenges", + ) + parser.add_argument( + "--twitter-handle", + help="The Twitter account to scrape", + ) + parser.add_argument( + "--bsky-handle", + help="Your Bluesky handle", + ) + parser.add_argument( + "--bsky-password", + # NOTE (FIX #15): same warning as --twitter-password above. + # Prefer setting BSKY_APP_PASSWORD in your .env file instead. + help="Your Bluesky app password", + ) + parser.add_argument( + "--bsky-base-url", + help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social", + ) + parser.add_argument( + "--bsky-langs", + help="Comma-separated language codes for Bluesky posts (default: ca)", + default=None, + ) + parser.add_argument( + "--dry-run", + action="store_true", + default=False, + help=( + "Simulate sync without posting to Bluesky. " + "Logs what would be posted." + ), + ) + + args = parser.parse_args() + + # Resolve credentials: CLI args take priority, then env vars. + # FIX #15 β€” env vars are the secure path; CLI args expose secrets in + # the process list. Operators should prefer .env / environment variables. + args.twitter_username = args.twitter_username or os.getenv( + "TWITTER_USERNAME" + ) + args.twitter_password = args.twitter_password or os.getenv( + "TWITTER_PASSWORD" + ) + args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") + args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") + args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") + args.twitter_handle = ( + args.twitter_handle + or os.getenv("TWITTER_HANDLE") + or args.twitter_username + ) + args.bsky_base_url = ( + args.bsky_base_url + if args.bsky_base_url + else DEFAULT_BSKY_BASE_URL + ) + + # --- Language handling: CLI > env > default (Catalan) --- + raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") + if raw_langs: + args.bsky_langs = [ + lang.strip() + for lang in raw_langs.split(",") + if lang.strip() + ] + logging.info( + f"🌍 Using configured Bluesky languages: {args.bsky_langs}" + ) + else: + args.bsky_langs = DEFAULT_BSKY_LANGS + logging.info( + f"🌍 Using default Bluesky languages: {args.bsky_langs}" + ) + + missing_args = [] + if not args.twitter_username: + missing_args.append("--twitter-username / TWITTER_USERNAME") + if not args.twitter_password: + missing_args.append("--twitter-password / TWITTER_PASSWORD") + if not args.bsky_handle: + missing_args.append("--bsky-handle / BSKY_HANDLE") + if not args.bsky_password: + missing_args.append("--bsky-password / BSKY_APP_PASSWORD") + + if missing_args: + logging.error( + f"❌ Missing credentials! You forgot to provide: " + f"{', '.join(missing_args)}" + ) + return + + logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") + logging.info( + f"🌍 Posting destination base URL: {args.bsky_base_url}" + ) + + if args.dry_run: + logging.info( + "πŸ§ͺ DRY RUN MODE ENABLED β€” no posts will be created." + ) + + reset_caches() + sync_feeds(args) + logging.info("πŸ€– Bot finished.") if __name__ == "__main__": - main() + main() \ No newline at end of file