From c6df6571538b10acf950e51f802709519ffc5bc3 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Sat, 4 Apr 2026 23:26:03 +0000 Subject: [PATCH] Added JSON state --- twitter2bsky_daemon.py | 1189 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1189 insertions(+) create mode 100644 twitter2bsky_daemon.py diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py new file mode 100644 index 0000000..09b6ffc --- /dev/null +++ b/twitter2bsky_daemon.py @@ -0,0 +1,1189 @@ +import argparse +import arrow +import hashlib +import json +import logging +import re +import httpx +import time +import os +import subprocess +from urllib.parse import urlparse +from dotenv import load_dotenv +from atproto import Client, client_utils, models +from playwright.sync_api import sync_playwright +from moviepy import VideoFileClip + +# --- Configuration --- +LOG_PATH = "twitter2bsky.log" +STATE_PATH = "twitter2bsky_state.json" +SCRAPE_TWEET_LIMIT = 30 +DEDUPE_BSKY_LIMIT = 30 +TWEET_MAX_AGE_DAYS = 3 + +# --- Logging Setup --- +logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], + level=logging.INFO, +) + + +# --- Custom Classes --- +class ScrapedMedia: + def __init__(self, url, media_type="photo"): + self.type = media_type + self.media_url_https = url + + +class ScrapedTweet: + def __init__(self, created_on, text, media_urls, tweet_url=None): + self.created_on = created_on + self.text = text + self.tweet_url = tweet_url + self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] + + +# --- Helpers --- +def take_error_screenshot(page, error_msg): + logging.info(f"πŸ“Έ Taking screenshot... Shot: {error_msg}") + timestamp = time.strftime("%Y%m%d_%H%M%S") + screenshot_name = f"screenshot_{timestamp}.png" + page.screenshot(path=screenshot_name) + logging.info(f"πŸ“Έ Screenshot saved as: {screenshot_name}") + + +def is_valid_url(url): + try: + response = httpx.head(url, timeout=5, follow_redirects=True) + return response.status_code < 500 + except Exception: + return False + + +def clean_url(url): + trimmed_url = url.strip() + cleaned_url = re.sub(r"\s+", "", trimmed_url) + cleaned_url = re.sub(r"[…\.]+$", "", cleaned_url) + + if is_valid_url(cleaned_url): + return cleaned_url + return None + + +def canonicalize_url(url): + if not url: + return None + return url.strip() + + +def canonicalize_tweet_url(url): + """ + Canonicalize x.com/twitter.com status URLs for internal dedupe only. + """ + if not url: + return None + + url = url.strip() + match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE) + if not match: + return url.lower() + + handle = match.group(1).lower() + tweet_id = match.group(2) + return f"https://x.com/{handle}/status/{tweet_id}" + + +def is_x_or_twitter_domain(url): + try: + hostname = (urlparse(url).hostname or "").lower() + return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} + except Exception: + return False + + +def extract_urls_from_text(text): + if not text: + return [] + return re.findall(r"https?://[^\s]+", text) + + +def extract_non_x_urls_from_text(text): + urls = extract_urls_from_text(text) + result = [] + + for url in urls: + cleaned = re.sub(r"[…\.]+$", "", url.strip()) + if cleaned and not is_x_or_twitter_domain(cleaned): + result.append(cleaned) + + return result + + +def extract_urls_from_facets(record): + """ + Extract link URLs from Bluesky rich text facets if present. + """ + urls = [] + + try: + facets = getattr(record, "facets", None) or [] + for facet in facets: + features = getattr(facet, "features", None) or [] + for feature in features: + uri = getattr(feature, "uri", None) + if uri: + urls.append(uri) + except Exception as e: + logging.debug(f"Could not extract facet URLs: {e}") + + return urls + + +def get_blob_from_url(media_url, client): + try: + r = httpx.get(media_url, timeout=30, follow_redirects=True) + if r.status_code == 200: + return client.upload_blob(r.content).blob + except Exception as e: + logging.warning(f"Could not fetch media {media_url}: {e}") + return None + + +def get_blob_from_file(file_path, client): + try: + with open(file_path, "rb") as f: + return client.upload_blob(f.read()).blob + except Exception as e: + logging.warning(f"Could not upload local file {file_path}: {e}") + return None + + +def prepare_post_text(text): + """ + Prepare the final public text exactly as it should be posted to Bluesky. + Does NOT append the source X URL. + """ + raw_text = (text or "").strip() + + if len(raw_text) > 295: + truncated = raw_text[:290] + last_space = truncated.rfind(" ") + if last_space > 0: + raw_text = truncated[:last_space] + "..." + else: + raw_text = truncated + "..." + + return raw_text + + +def normalize_post_text(text): + """ + Normalize post text for duplicate detection. + """ + if not text: + return "" + + text = text.replace("\r", "\n") + text = re.sub(r"\s+", " ", text).strip() + return text.lower() + + +def build_media_fingerprint(tweet): + """ + Build a deterministic media fingerprint from scraped tweet media. + """ + if not tweet or not tweet.media: + return "no-media" + + parts = [] + + for media in tweet.media: + media_type = getattr(media, "type", "unknown") + media_url = getattr(media, "media_url_https", "") or "" + + stable_value = media_url + + if media_type == "photo": + stable_value = re.sub(r"[?&]name=\w+", "", stable_value) + stable_value = re.sub(r"[?&]format=\w+", "", stable_value) + elif media_type == "video": + stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "") + + parts.append(f"{media_type}:{stable_value}") + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def build_bsky_media_fingerprint(post_view): + """ + Best-effort media fingerprint from Bluesky embed structure. + """ + try: + embed = getattr(post_view, "embed", None) + if not embed: + return "no-media" + + parts = [] + + images = getattr(embed, "images", None) + if images: + for img in images: + image_obj = getattr(img, "image", None) + ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj) + parts.append(f"photo:{ref}") + + video = getattr(embed, "video", None) + if video: + ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) + parts.append(f"video:{ref}") + + external = getattr(embed, "external", None) + if external: + uri = getattr(external, "uri", None) or str(external) + parts.append(f"external:{uri}") + + if not parts: + return "no-media" + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + except Exception as e: + logging.debug(f"Could not build Bluesky media fingerprint: {e}") + return "no-media" + + +def build_text_media_key(normalized_text, media_fingerprint): + return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest() + + +# --- Local State Management --- +def default_state(): + return { + "version": 1, + "posted_tweets": {}, + "posted_by_bsky_uri": {}, + "updated_at": None, + } + + +def load_state(state_path=STATE_PATH): + if not os.path.exists(state_path): + logging.info(f"🧠 No state file found at {state_path}. Starting with empty memory.") + return default_state() + + try: + with open(state_path, "r", encoding="utf-8") as f: + state = json.load(f) + + if not isinstance(state, dict): + logging.warning("⚠️ State file is invalid. Reinitializing.") + return default_state() + + state.setdefault("version", 1) + state.setdefault("posted_tweets", {}) + state.setdefault("posted_by_bsky_uri", {}) + state.setdefault("updated_at", None) + + return state + + except Exception as e: + logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.") + return default_state() + + +def save_state(state, state_path=STATE_PATH): + try: + state["updated_at"] = arrow.utcnow().isoformat() + temp_path = f"{state_path}.tmp" + + with open(temp_path, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) + + os.replace(temp_path, state_path) + logging.info(f"πŸ’Ύ State saved to {state_path}") + + except Exception as e: + logging.error(f"❌ Failed to save state file {state_path}: {e}") + + +def remember_posted_tweet(state, candidate, bsky_uri=None): + """ + Store successful post in local state. + Primary key is canonical tweet URL when available. + Fallback key uses text_media_key. + """ + canonical_tweet_url = candidate.get("canonical_tweet_url") + fallback_key = f"textmedia:{candidate['text_media_key']}" + state_key = canonical_tweet_url or fallback_key + + record = { + "canonical_tweet_url": canonical_tweet_url, + "normalized_text": candidate["normalized_text"], + "raw_text": candidate["raw_text"], + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), + "bsky_uri": bsky_uri, + "tweet_created_on": candidate["tweet"].created_on, + "tweet_url": candidate["tweet"].tweet_url, + "posted_at": arrow.utcnow().isoformat(), + } + + state["posted_tweets"][state_key] = record + + if bsky_uri: + state["posted_by_bsky_uri"][bsky_uri] = state_key + + +def candidate_matches_state(candidate, state): + """ + Strong private dedupe using local persistent state. + Match order: + 1. canonical tweet URL + 2. text + media fingerprint + 3. normalized text + """ + canonical_tweet_url = candidate["canonical_tweet_url"] + text_media_key = candidate["text_media_key"] + normalized_text = candidate["normalized_text"] + + posted_tweets = state.get("posted_tweets", {}) + + if canonical_tweet_url and canonical_tweet_url in posted_tweets: + return True, "state:tweet_url" + + for _, record in posted_tweets.items(): + if record.get("text_media_key") == text_media_key: + return True, "state:text_media_fingerprint" + + for _, record in posted_tweets.items(): + if record.get("normalized_text") == normalized_text: + return True, "state:normalized_text" + + return False, None + + +def prune_state(state, max_entries=5000): + """ + Keep state file from growing forever. + Prunes oldest records by posted_at if necessary. + """ + posted_tweets = state.get("posted_tweets", {}) + + if len(posted_tweets) <= max_entries: + return state + + sortable = [] + for key, record in posted_tweets.items(): + posted_at = record.get("posted_at") or "" + sortable.append((key, posted_at)) + + sortable.sort(key=lambda x: x[1], reverse=True) + keep_keys = {key for key, _ in sortable[:max_entries]} + + new_posted_tweets = {} + for key, record in posted_tweets.items(): + if key in keep_keys: + new_posted_tweets[key] = record + + new_posted_by_bsky_uri = {} + for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items(): + if key in keep_keys: + new_posted_by_bsky_uri[bsky_uri] = key + + state["posted_tweets"] = new_posted_tweets + state["posted_by_bsky_uri"] = new_posted_by_bsky_uri + return state + + +# --- Bluesky Post History --- +def get_recent_bsky_posts(client, handle, limit=30): + """ + Fetch recent top-level Bluesky posts for duplicate detection. + Returns a list of dicts with dedupe keys. + """ + recent_posts = [] + + try: + timeline = client.get_author_feed(handle, limit=limit) + + for item in timeline.feed: + try: + if item.reason is not None: + continue + + record = item.post.record + if getattr(record, "reply", None) is not None: + continue + + text = getattr(record, "text", "") or "" + normalized_text = normalize_post_text(text) + + urls = [] + urls.extend(extract_non_x_urls_from_text(text)) + urls.extend(extract_urls_from_facets(record)) + + canonical_non_x_urls = set() + for url in urls: + if not is_x_or_twitter_domain(url): + canonical = canonicalize_url(url) + if canonical: + canonical_non_x_urls.add(canonical) + + media_fingerprint = build_bsky_media_fingerprint(item.post) + text_media_key = build_text_media_key(normalized_text, media_fingerprint) + + recent_posts.append({ + "uri": getattr(item.post, "uri", None), + "text": text, + "normalized_text": normalized_text, + "canonical_non_x_urls": canonical_non_x_urls, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "created_at": getattr(record, "created_at", None), + }) + + except Exception as e: + logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") + + except Exception as e: + logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}") + + return recent_posts + + +def make_rich(content): + text_builder = client_utils.TextBuilder() + + def repair_url(match): + raw = match.group(0) + + if "\n" not in raw and "\r" not in raw: + return re.sub(r"[…\.]+$", "", raw) + + glued = raw.replace("\n", "").replace("\r", "") + test_url = re.sub(r"[…\.]+$", "", glued) + + if is_valid_url(test_url): + return test_url + + parts = raw.split("\n") + test_part0 = re.sub(r"[…\.]+$", "", parts[0]) + if is_valid_url(test_part0): + return raw + + return test_url + + content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip()) + lines = content.splitlines() + + for line_idx, line in enumerate(lines): + if not line.strip(): + if line_idx < len(lines) - 1: + text_builder.text("\n") + continue + + words = line.split(" ") + for i, word in enumerate(words): + if not word: + if i < len(words) - 1: + text_builder.text(" ") + continue + + if word.startswith("http://") or word.startswith("https://"): + if word.startswith("http://"): + word = word.replace("http://", "https://", 1) + + word = re.sub(r"[…\.]+$", "", word) + clean_url_value = clean_url(word) + + if clean_url_value and is_valid_url(clean_url_value): + text_builder.link(clean_url_value, clean_url_value) + else: + text_builder.text(word) + + elif word.startswith("#"): + clean_tag = word[1:].rstrip(".,;:!?)'\"…") + text_builder.tag(word, clean_tag) + + else: + text_builder.text(word) + + if i < len(words) - 1: + text_builder.text(" ") + + if line_idx < len(lines) - 1: + text_builder.text("\n") + + return text_builder + + +def build_dynamic_alt(raw_text): + dynamic_alt = raw_text.replace("\n", " ").strip() + dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() + + if len(dynamic_alt) > 150: + dynamic_alt = dynamic_alt[:147] + "..." + elif not dynamic_alt: + dynamic_alt = "VΓ­deo o imatge adjunta al tuit" + + return dynamic_alt + + +def build_video_embed(video_blob, alt_text): + try: + return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) + except AttributeError: + logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") + return None + + +# --- Playwright Scraping --- +def scrape_tweets_via_playwright(username, password, email, target_handle): + tweets = [] + state_file = "twitter_browser_state.json" + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"] + ) + clean_ua = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ) + + context = None + needs_login = True + + if os.path.exists(state_file): + logging.info("βœ… Found existing browser state. Attempting to bypass login...") + context = browser.new_context( + user_agent=clean_ua, + viewport={"width": 1920, "height": 1080}, + storage_state=state_file + ) + page = context.new_page() + page.goto("https://x.com/home") + time.sleep(4) + + if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: + logging.info("βœ… Session is valid!") + needs_login = False + else: + logging.warning("⚠️ Saved session expired or invalid. Re-logging in...") + context.close() + os.remove(state_file) + + if needs_login: + logging.info("πŸš€ Launching fresh browser for automated Twitter login...") + context = browser.new_context( + user_agent=clean_ua, + viewport={"width": 1920, "height": 1080} + ) + page = context.new_page() + + try: + page.goto("https://x.com") + sign_in_button = page.get_by_text("Sign in", exact=True) + sign_in_button.wait_for(state="visible", timeout=15000) + sign_in_button.click(force=True) + + page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000) + logging.info(f"πŸ‘€ Entering username: {username}...") + time.sleep(1) + + username_input = page.locator('input[autocomplete="username"]') + username_input.wait_for(state="visible", timeout=15000) + username_input.click(force=True) + username_input.press_sequentially(username, delay=100) + + page.locator('button:has-text("Next")').first.click(force=True) + page.wait_for_selector( + 'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', + timeout=15000 + ) + time.sleep(1) + + if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): + logging.warning("πŸ›‘οΈ Security challenge detected! Entering email/phone...") + page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) + sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first + if sec_next.is_visible(): + sec_next.click(force=True) + else: + page.keyboard.press("Enter") + page.wait_for_selector('input[name="password"]', timeout=15000) + time.sleep(1) + + logging.info("πŸ”‘ Entering password...") + page.fill('input[name="password"]', password) + page.locator('span:has-text("Log in")').first.click() + + page.wait_for_url("**/home", timeout=20000) + time.sleep(3) + + context.storage_state(path=state_file) + logging.info("βœ… Login successful. Browser state saved.") + + except Exception as e: + take_error_screenshot(page, "login_failed") + logging.error(f"❌ Login failed: {e}") + browser.close() + return [] + + logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...") + page = context.new_page() + page.goto(f"https://x.com/{target_handle}") + + try: + page.wait_for_selector("article", timeout=20000) + time.sleep(3) + + articles = page.locator("article").all() + logging.info(f"πŸ“Š Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...") + + for article in articles[:SCRAPE_TWEET_LIMIT]: + try: + time_el = article.locator("time").first + if not time_el.is_visible(): + continue + + created_at = time_el.get_attribute("datetime") + + tweet_url = None + time_link = article.locator("a:has(time)").first + if time_link.is_visible(): + href = time_link.get_attribute("href") + if href: + tweet_url = f"https://x.com{href}" if href.startswith("/") else href + + text_locator = article.locator('[data-testid="tweetText"]').first + text = text_locator.inner_text() if text_locator.is_visible() else "" + + media_urls = [] + + photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() + for img in photo_locators: + src = img.get_attribute("src") + if src: + src = re.sub(r"&name=\w+", "&name=large", src) + media_urls.append((src, "photo")) + + video_locators = article.locator('[data-testid="videoPlayer"]').all() + if video_locators: + media_urls.append((tweet_url or "", "video")) + + tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url)) + + except Exception as e: + logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") + continue + + except Exception as e: + take_error_screenshot(page, "scrape_failed") + logging.error(f"❌ Failed to scrape profile: {e}") + + browser.close() + return tweets + + +def extract_video_url_from_tweet_page(context, tweet_url): + page = context.new_page() + best_m3u8_url = None + best_video_mp4_url = None + seen_urls = set() + + def is_audio_only_mp4(url, content_type): + url_l = url.lower() + content_type_l = content_type.lower() + return ( + "/aud/" in url_l or + "/audio/" in url_l or + "mp4a" in url_l or + ("audio/" in content_type_l and "video/" not in content_type_l) + ) + + def handle_response(response): + nonlocal best_m3u8_url, best_video_mp4_url + try: + url = response.url + if url in seen_urls: + return + seen_urls.add(url) + + url_l = url.lower() + content_type = response.headers.get("content-type", "") + content_type_l = content_type.lower() + + if ".m4s" in url_l: + return + + if ( + ".m3u8" in url_l or + "application/vnd.apple.mpegurl" in content_type_l or + "application/x-mpegurl" in content_type_l + ): + if best_m3u8_url is None: + best_m3u8_url = url + logging.info(f"πŸ“Ί Found HLS playlist URL: {url}") + return + + if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l: + if is_audio_only_mp4(url, content_type): + logging.info(f"πŸ”‡ Ignoring audio-only MP4: {url}") + return + + if best_video_mp4_url is None: + best_video_mp4_url = url + logging.info(f"πŸŽ₯ Found VIDEO MP4 URL: {url}") + return + + except Exception as e: + logging.debug(f"Response parsing error: {e}") + + page.on("response", handle_response) + + def current_best(): + return best_m3u8_url or best_video_mp4_url + + try: + logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") + page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) + time.sleep(3) + + player = page.locator('[data-testid="videoPlayer"]').first + + if player.count() > 0: + try: + player.scroll_into_view_if_needed(timeout=5000) + except Exception: + pass + + try: + player.click(force=True, timeout=5000) + logging.info("▢️ Clicked video player") + except Exception as e: + logging.info(f"⚠️ First player click failed: {e}") + else: + logging.warning("⚠️ No video player locator found on tweet page") + + for _ in range(12): + if current_best(): + break + time.sleep(1) + + if not current_best() and player.count() > 0: + logging.info("πŸ” No media URL found yet, retrying player interaction...") + try: + player.click(force=True, timeout=5000) + time.sleep(2) + except Exception as e: + logging.info(f"⚠️ Retry click failed: {e}") + + try: + page.keyboard.press("Space") + time.sleep(1) + except Exception: + pass + + for _ in range(8): + if current_best(): + break + time.sleep(1) + + selected_url = current_best() + if selected_url: + logging.info(f"βœ… Selected media URL for download: {selected_url}") + else: + logging.warning(f"⚠️ No playable media URL detected on tweet page: {tweet_url}") + + return selected_url + + except Exception as e: + logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}") + return None + finally: + page.close() + + +# --- Video Processing --- +def download_and_crop_video(video_url, output_path): + temp_input = output_path.replace(".mp4", "_source.mp4") + temp_output = output_path.replace(".mp4", "_cropped.mp4") + + try: + logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}") + + video_url_l = video_url.lower() + + if ".m3u8" in video_url_l: + logging.info("πŸ“Ί Using HLS ffmpeg mode") + download_cmd = [ + "ffmpeg", + "-y", + "-protocol_whitelist", "file,http,https,tcp,tls,crypto", + "-allowed_extensions", "ALL", + "-i", video_url, + "-c", "copy", + temp_input, + ] + else: + logging.info("πŸŽ₯ Using direct MP4 ffmpeg mode") + download_cmd = [ + "ffmpeg", + "-y", + "-i", video_url, + "-c", "copy", + temp_input, + ] + + download_result = subprocess.run( + download_cmd, + capture_output=True, + text=True + ) + + if download_result.returncode != 0: + logging.error(f"❌ ffmpeg download failed:\n{download_result.stderr}") + return None + + if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: + logging.error("❌ Downloaded video source file is missing or empty.") + return None + + logging.info(f"βœ… Video downloaded: {temp_input}") + + video_clip = VideoFileClip(temp_input) + duration = float(video_clip.duration) if video_clip.duration else 0 + + if duration <= 0: + video_clip.close() + logging.error("❌ Downloaded video has invalid or unknown duration.") + return None + + end_time = min(59, duration) + + if hasattr(video_clip, "subclipped"): + cropped_clip = video_clip.subclipped(0, end_time) + else: + cropped_clip = video_clip.subclip(0, end_time) + + cropped_clip.write_videofile( + temp_output, + codec="libx264", + audio_codec="aac", + logger=None + ) + + video_clip.close() + cropped_clip.close() + + if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: + logging.error("❌ Cropped video output is missing or empty.") + return None + + os.replace(temp_output, output_path) + logging.info(f"βœ… Video cropped to 59 seconds: {output_path}") + return output_path + + except Exception as e: + logging.error(f"❌ Error processing video: {e}") + return None + + finally: + for path in [temp_input, temp_output]: + if os.path.exists(path): + try: + os.remove(path) + except Exception: + pass + + +def candidate_matches_existing_bsky(candidate, recent_bsky_posts): + """ + Multi-signal dedupe against recent Bluesky posts. + """ + candidate_non_x_urls = candidate["canonical_non_x_urls"] + candidate_text_media_key = candidate["text_media_key"] + candidate_normalized_text = candidate["normalized_text"] + + for existing in recent_bsky_posts: + existing_non_x_urls = existing["canonical_non_x_urls"] + + if ( + candidate_non_x_urls and + candidate_non_x_urls == existing_non_x_urls and + candidate_normalized_text == existing["normalized_text"] + ): + return True, "bsky:normalized_text_plus_non_x_urls" + + if candidate_text_media_key == existing["text_media_key"]: + return True, "bsky:text_media_fingerprint" + + if candidate_normalized_text == existing["normalized_text"]: + return True, "bsky:normalized_text" + + return False, None + + +# --- Main Sync Function --- +def sync_feeds(args): + logging.info("πŸ”„ Starting sync cycle...") + try: + state = load_state(STATE_PATH) + + tweets = scrape_tweets_via_playwright( + args.twitter_username, + args.twitter_password, + args.twitter_email, + args.twitter_handle + ) + + if not tweets: + logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") + return + + bsky_client = Client() + bsky_client.login(args.bsky_handle, args.bsky_password) + + recent_bsky_posts = get_recent_bsky_posts( + bsky_client, + args.bsky_handle, + limit=DEDUPE_BSKY_LIMIT + ) + + logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") + logging.info(f"🧠 Local state currently tracks {len(state.get('posted_tweets', {}))} posted items.") + + too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) + logging.info(f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}") + + candidate_tweets = [] + + for tweet in reversed(tweets): + try: + tweet_time = arrow.get(tweet.created_on) + + if tweet_time < too_old_cutoff: + logging.info(f"⏭️ Skipping old tweet from {tweet_time}") + continue + + prepared_text = prepare_post_text(tweet.text) + normalized_text = normalize_post_text(prepared_text) + + if not normalized_text: + logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}") + continue + + media_fingerprint = build_media_fingerprint(tweet) + text_media_key = build_text_media_key(normalized_text, media_fingerprint) + + canonical_non_x_urls = set() + for url in extract_non_x_urls_from_text(prepared_text): + canonical = canonicalize_url(url) + if canonical: + canonical_non_x_urls.add(canonical) + + candidate_tweets.append({ + "tweet": tweet, + "tweet_time": tweet_time, + "raw_text": prepared_text, + "normalized_text": normalized_text, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url), + "canonical_non_x_urls": canonical_non_x_urls, + }) + + except Exception as e: + logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}") + + logging.info(f"πŸ§ͺ Prepared {len(candidate_tweets)} candidate tweets for duplicate comparison.") + + tweets_to_post = [] + for candidate in candidate_tweets: + is_dup_state, reason_state = candidate_matches_state(candidate, state) + if is_dup_state: + logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}") + continue + + is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) + if is_dup_bsky: + logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") + continue + + tweets_to_post.append(candidate) + + logging.info(f"πŸ“¬ {len(tweets_to_post)} tweets remain after duplicate filtering.") + + if not tweets_to_post: + logging.info("βœ… No new tweets need posting after duplicate comparison.") + return + + new_posts = 0 + state_file = "twitter_browser_state.json" + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"] + ) + context_kwargs = { + "user_agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + "viewport": {"width": 1920, "height": 1080}, + } + if os.path.exists(state_file): + context_kwargs["storage_state"] = state_file + + context = browser.new_context(**context_kwargs) + + for candidate in tweets_to_post: + tweet = candidate["tweet"] + tweet_time = candidate["tweet_time"] + raw_text = candidate["raw_text"] + + logging.info(f"πŸ“ Posting missing tweet from {tweet_time} to Bluesky...") + + rich_text = make_rich(raw_text) + dynamic_alt = build_dynamic_alt(raw_text) + + image_embeds = [] + video_embed = None + + if tweet.media: + for media in tweet.media: + if media.type == "photo": + blob = get_blob_from_url(media.media_url_https, bsky_client) + if blob: + image_embeds.append( + models.AppBskyEmbedImages.Image( + alt=dynamic_alt, + image=blob + ) + ) + + elif media.type == "video": + if not tweet.tweet_url: + logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.") + continue + + temp_video_path = "temp_video.mp4" + + try: + real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) + if not real_video_url: + logging.warning(f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}") + continue + + cropped_video_path = download_and_crop_video(real_video_url, temp_video_path) + if not cropped_video_path: + logging.warning(f"⚠️ Video download/crop failed for {tweet.tweet_url}") + continue + + video_blob = get_blob_from_file(cropped_video_path, bsky_client) + if not video_blob: + logging.warning(f"⚠️ Video upload blob failed for {tweet.tweet_url}") + continue + + video_embed = build_video_embed(video_blob, dynamic_alt) + + finally: + if os.path.exists(temp_video_path): + os.remove(temp_video_path) + + try: + post_result = None + + if video_embed: + post_result = bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"]) + elif image_embeds: + embed = models.AppBskyEmbedImages.Main(images=image_embeds) + post_result = bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"]) + else: + post_result = bsky_client.send_post(text=rich_text, langs=["ca"]) + + bsky_uri = getattr(post_result, "uri", None) + + remember_posted_tweet(state, candidate, bsky_uri=bsky_uri) + state = prune_state(state, max_entries=5000) + save_state(state, STATE_PATH) + + recent_bsky_posts.insert(0, { + "uri": bsky_uri, + "text": raw_text, + "normalized_text": candidate["normalized_text"], + "canonical_non_x_urls": candidate["canonical_non_x_urls"], + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "created_at": arrow.utcnow().isoformat(), + }) + recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] + + new_posts += 1 + logging.info(f"βœ… Posted new tweet to Bluesky: {raw_text}") + time.sleep(5) + + except Exception as e: + logging.error(f"❌ Failed to post tweet to Bluesky: {e}") + + browser.close() + + logging.info(f"βœ… Sync complete. Posted {new_posts} new updates.") + + except Exception as e: + logging.error(f"❌ Error during sync cycle: {e}") + + +# --- Main Execution --- +def main(): + load_dotenv() + + parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") + parser.add_argument("--twitter-username", help="Your Twitter login username") + parser.add_argument("--twitter-password", help="Your Twitter login password") + parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") + parser.add_argument("--twitter-handle", help="The Twitter account to scrape") + parser.add_argument("--bsky-handle", help="Your Bluesky handle") + parser.add_argument("--bsky-password", help="Your Bluesky app password") + + args = parser.parse_args() + + args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") + args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") + args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") + args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") + args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") + args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username + + missing_args = [] + if not args.twitter_username: + missing_args.append("--twitter-username") + if not args.twitter_password: + missing_args.append("--twitter-password") + if not args.bsky_handle: + missing_args.append("--bsky-handle") + if not args.bsky_password: + missing_args.append("--bsky-password") + + if missing_args: + logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}") + return + + logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") + sync_feeds(args) + logging.info("πŸ€– Bot finished.") + + +if __name__ == "__main__": + main()