From 112b1e08088d3ed7ef5098586d21f96508d9ce7a Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Sat, 4 Apr 2026 23:22:35 +0000 Subject: [PATCH] Delete twitter2bsky_daemon.py --- twitter2bsky_daemon.py | 1007 ---------------------------------------- 1 file changed, 1007 deletions(-) delete mode 100644 twitter2bsky_daemon.py diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py deleted file mode 100644 index cd25af1..0000000 --- a/twitter2bsky_daemon.py +++ /dev/null @@ -1,1007 +0,0 @@ -import argparse -import arrow -import hashlib -import logging -import re -import httpx -import time -import os -import subprocess -from dotenv import load_dotenv -from atproto import Client, client_utils, models -from playwright.sync_api import sync_playwright -from moviepy import VideoFileClip - -# --- Configuration --- -LOG_PATH = "twitter2bsky.log" -SCRAPE_TWEET_LIMIT = 30 -DEDUPE_BSKY_LIMIT = 30 -TWEET_MAX_AGE_DAYS = 3 -APPEND_SOURCE_TWEET_URL = True - -# --- Logging Setup --- -logging.basicConfig( - format="%(asctime)s [%(levelname)s] %(message)s", - handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], - level=logging.INFO, -) - - -# --- Custom Classes --- -class ScrapedMedia: - def __init__(self, url, media_type="photo"): - self.type = media_type - self.media_url_https = url - - -class ScrapedTweet: - def __init__(self, created_on, text, media_urls, tweet_url=None): - self.created_on = created_on - self.text = text - self.tweet_url = tweet_url - self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] - - -# --- Helpers --- -def take_error_screenshot(page, error_msg): - logging.info(f"πŸ“Έ Taking screenshot... Shot: {error_msg}") - timestamp = time.strftime("%Y%m%d_%H%M%S") - screenshot_name = f"screenshot_{timestamp}.png" - page.screenshot(path=screenshot_name) - logging.info(f"πŸ“Έ Screenshot saved as: {screenshot_name}") - - -def is_valid_url(url): - try: - response = httpx.head(url, timeout=5, follow_redirects=True) - return response.status_code < 500 - except Exception: - return False - - -def clean_url(url): - trimmed_url = url.strip() - cleaned_url = re.sub(r"\s+", "", trimmed_url) - cleaned_url = re.sub(r"[…\.]+$", "", cleaned_url) - - if is_valid_url(cleaned_url): - return cleaned_url - return None - - -def canonicalize_tweet_url(url): - """ - Canonicalize x.com/twitter.com status URLs for dedupe. - """ - if not url: - return None - - url = url.strip() - - match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE) - if not match: - return url.lower() - - handle = match.group(1).lower() - tweet_id = match.group(2) - return f"https://x.com/{handle}/status/{tweet_id}" - - -def extract_urls_from_text(text): - if not text: - return [] - return re.findall(r"https?://[^\s]+", text) - - -def extract_urls_from_facets(record): - """ - Extract link URLs from Bluesky rich text facets if present. - """ - urls = [] - - try: - facets = getattr(record, "facets", None) or [] - for facet in facets: - features = getattr(facet, "features", None) or [] - for feature in features: - uri = getattr(feature, "uri", None) - if uri: - urls.append(uri) - except Exception as e: - logging.debug(f"Could not extract facet URLs: {e}") - - return urls - - -def get_blob_from_url(media_url, client): - try: - r = httpx.get(media_url, timeout=30, follow_redirects=True) - if r.status_code == 200: - return client.upload_blob(r.content).blob - except Exception as e: - logging.warning(f"Could not fetch media {media_url}: {e}") - return None - - -def get_blob_from_file(file_path, client): - try: - with open(file_path, "rb") as f: - return client.upload_blob(f.read()).blob - except Exception as e: - logging.warning(f"Could not upload local file {file_path}: {e}") - return None - - -def prepare_post_text(text, tweet_url=None): - """ - Prepare the final text exactly as it would be posted to Bluesky. - Optionally append source tweet URL for stronger dedupe. - """ - raw_text = (text or "").strip() - - if APPEND_SOURCE_TWEET_URL and tweet_url: - canonical_url = canonicalize_tweet_url(tweet_url) - if canonical_url and canonical_url not in raw_text: - if raw_text: - raw_text = f"{raw_text}\n\n{canonical_url}" - else: - raw_text = canonical_url - - if len(raw_text) > 295: - truncated = raw_text[:290] - last_space = truncated.rfind(" ") - if last_space > 0: - raw_text = truncated[:last_space] + "..." - else: - raw_text = truncated + "..." - - return raw_text - - -def normalize_post_text(text): - """ - Normalize post text for duplicate detection. - """ - if not text: - return "" - - text = text.replace("\r", "\n") - text = re.sub(r"\s+", " ", text).strip() - return text.lower() - - -def build_media_fingerprint(tweet): - """ - Build a deterministic media fingerprint from scraped tweet media. - Uses media type + canonicalized/stable media URL components. - """ - if not tweet or not tweet.media: - return "no-media" - - parts = [] - - for media in tweet.media: - media_type = getattr(media, "type", "unknown") - media_url = getattr(media, "media_url_https", "") or "" - - stable_value = media_url - - if media_type == "photo": - stable_value = re.sub(r"[?&]name=\w+", "", stable_value) - stable_value = re.sub(r"[?&]format=\w+", "", stable_value) - elif media_type == "video": - stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "") - - parts.append(f"{media_type}:{stable_value}") - - parts.sort() - raw = "|".join(parts) - return hashlib.sha256(raw.encode("utf-8")).hexdigest() - - -def build_bsky_media_fingerprint(post_view): - """ - Build a best-effort media fingerprint from Bluesky embed structure. - This won't always perfectly match X source media IDs, but it gives a stable - signature for comparison among already-posted Bluesky items. - """ - try: - embed = getattr(post_view, "embed", None) - if not embed: - return "no-media" - - parts = [] - - images = getattr(embed, "images", None) - if images: - for img in images: - image_obj = getattr(img, "image", None) - ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj) - parts.append(f"photo:{ref}") - - video = getattr(embed, "video", None) - if video: - ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) - parts.append(f"video:{ref}") - - external = getattr(embed, "external", None) - if external: - uri = getattr(external, "uri", None) or str(external) - parts.append(f"external:{uri}") - - if not parts: - return "no-media" - - parts.sort() - raw = "|".join(parts) - return hashlib.sha256(raw.encode("utf-8")).hexdigest() - - except Exception as e: - logging.debug(f"Could not build Bluesky media fingerprint: {e}") - return "no-media" - - -def build_text_media_key(normalized_text, media_fingerprint): - return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest() - - -def get_recent_bsky_posts(client, handle, limit=30): - """ - Fetch recent top-level Bluesky posts for duplicate detection. - Returns a list of dicts with dedupe keys. - """ - recent_posts = [] - - try: - timeline = client.get_author_feed(handle, limit=limit) - - for item in timeline.feed: - try: - if item.reason is not None: - continue - - record = item.post.record - if getattr(record, "reply", None) is not None: - continue - - text = getattr(record, "text", "") or "" - normalized_text = normalize_post_text(text) - - urls = [] - urls.extend(extract_urls_from_text(text)) - urls.extend(extract_urls_from_facets(record)) - - canonical_urls = set() - for url in urls: - canonical = canonicalize_tweet_url(url) - if canonical: - canonical_urls.add(canonical) - - media_fingerprint = build_bsky_media_fingerprint(item.post) - text_media_key = build_text_media_key(normalized_text, media_fingerprint) - - recent_posts.append({ - "uri": getattr(item.post, "uri", None), - "text": text, - "normalized_text": normalized_text, - "canonical_urls": canonical_urls, - "media_fingerprint": media_fingerprint, - "text_media_key": text_media_key, - "created_at": getattr(record, "created_at", None), - }) - - except Exception as e: - logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") - - except Exception as e: - logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}") - - return recent_posts - - -def make_rich(content): - text_builder = client_utils.TextBuilder() - - def repair_url(match): - raw = match.group(0) - - if "\n" not in raw and "\r" not in raw: - return re.sub(r"[…\.]+$", "", raw) - - glued = raw.replace("\n", "").replace("\r", "") - test_url = re.sub(r"[…\.]+$", "", glued) - - if is_valid_url(test_url): - return test_url - - parts = raw.split("\n") - test_part0 = re.sub(r"[…\.]+$", "", parts[0]) - if is_valid_url(test_part0): - return raw - - return test_url - - content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip()) - lines = content.splitlines() - - for line_idx, line in enumerate(lines): - if not line.strip(): - if line_idx < len(lines) - 1: - text_builder.text("\n") - continue - - words = line.split(" ") - for i, word in enumerate(words): - if not word: - if i < len(words) - 1: - text_builder.text(" ") - continue - - if word.startswith("http://") or word.startswith("https://"): - if word.startswith("http://"): - word = word.replace("http://", "https://", 1) - - word = re.sub(r"[…\.]+$", "", word) - clean_url_value = clean_url(word) - - if clean_url_value and is_valid_url(clean_url_value): - text_builder.link(clean_url_value, clean_url_value) - else: - text_builder.text(word) - - elif word.startswith("#"): - clean_tag = word[1:].rstrip(".,;:!?)'\"…") - text_builder.tag(word, clean_tag) - - else: - text_builder.text(word) - - if i < len(words) - 1: - text_builder.text(" ") - - if line_idx < len(lines) - 1: - text_builder.text("\n") - - return text_builder - - -def build_dynamic_alt(raw_text): - dynamic_alt = raw_text.replace("\n", " ").strip() - dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() - - if len(dynamic_alt) > 150: - dynamic_alt = dynamic_alt[:147] + "..." - elif not dynamic_alt: - dynamic_alt = "VΓ­deo o imatge adjunta al tuit" - - return dynamic_alt - - -def build_video_embed(video_blob, alt_text): - try: - return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) - except AttributeError: - logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") - return None - - -# --- Playwright Scraping --- -def scrape_tweets_via_playwright(username, password, email, target_handle): - tweets = [] - state_file = "twitter_browser_state.json" - - with sync_playwright() as p: - browser = p.chromium.launch( - headless=True, - args=["--disable-blink-features=AutomationControlled"] - ) - clean_ua = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" - ) - - context = None - needs_login = True - - if os.path.exists(state_file): - logging.info("βœ… Found existing browser state. Attempting to bypass login...") - context = browser.new_context( - user_agent=clean_ua, - viewport={"width": 1920, "height": 1080}, - storage_state=state_file - ) - page = context.new_page() - page.goto("https://x.com/home") - time.sleep(4) - - if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: - logging.info("βœ… Session is valid!") - needs_login = False - else: - logging.warning("⚠️ Saved session expired or invalid. Re-logging in...") - context.close() - os.remove(state_file) - - if needs_login: - logging.info("πŸš€ Launching fresh browser for automated Twitter login...") - context = browser.new_context( - user_agent=clean_ua, - viewport={"width": 1920, "height": 1080} - ) - page = context.new_page() - - try: - page.goto("https://x.com") - sign_in_button = page.get_by_text("Sign in", exact=True) - sign_in_button.wait_for(state="visible", timeout=15000) - sign_in_button.click(force=True) - - page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000) - logging.info(f"πŸ‘€ Entering username: {username}...") - time.sleep(1) - - username_input = page.locator('input[autocomplete="username"]') - username_input.wait_for(state="visible", timeout=15000) - username_input.click(force=True) - username_input.press_sequentially(username, delay=100) - - page.locator('button:has-text("Next")').first.click(force=True) - page.wait_for_selector( - 'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', - timeout=15000 - ) - time.sleep(1) - - if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): - logging.warning("πŸ›‘οΈ Security challenge detected! Entering email/phone...") - page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) - sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first - if sec_next.is_visible(): - sec_next.click(force=True) - else: - page.keyboard.press("Enter") - page.wait_for_selector('input[name="password"]', timeout=15000) - time.sleep(1) - - logging.info("πŸ”‘ Entering password...") - page.fill('input[name="password"]', password) - page.locator('span:has-text("Log in")').first.click() - - page.wait_for_url("**/home", timeout=20000) - time.sleep(3) - - context.storage_state(path=state_file) - logging.info("βœ… Login successful. Browser state saved.") - - except Exception as e: - take_error_screenshot(page, "login_failed") - logging.error(f"❌ Login failed: {e}") - browser.close() - return [] - - logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...") - page = context.new_page() - page.goto(f"https://x.com/{target_handle}") - - try: - page.wait_for_selector("article", timeout=20000) - time.sleep(3) - - articles = page.locator("article").all() - logging.info(f"πŸ“Š Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...") - - for article in articles[:SCRAPE_TWEET_LIMIT]: - try: - time_el = article.locator("time").first - if not time_el.is_visible(): - continue - - created_at = time_el.get_attribute("datetime") - - tweet_url = None - time_link = article.locator("a:has(time)").first - if time_link.is_visible(): - href = time_link.get_attribute("href") - if href: - tweet_url = f"https://x.com{href}" if href.startswith("/") else href - - text_locator = article.locator('[data-testid="tweetText"]').first - text = text_locator.inner_text() if text_locator.is_visible() else "" - - media_urls = [] - - photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() - for img in photo_locators: - src = img.get_attribute("src") - if src: - src = re.sub(r"&name=\w+", "&name=large", src) - media_urls.append((src, "photo")) - - video_locators = article.locator('[data-testid="videoPlayer"]').all() - if video_locators: - media_urls.append((tweet_url or "", "video")) - - tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url)) - - except Exception as e: - logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") - continue - - except Exception as e: - take_error_screenshot(page, "scrape_failed") - logging.error(f"❌ Failed to scrape profile: {e}") - - browser.close() - return tweets - - -def extract_video_url_from_tweet_page(context, tweet_url): - page = context.new_page() - best_m3u8_url = None - best_video_mp4_url = None - seen_urls = set() - - def is_audio_only_mp4(url, content_type): - url_l = url.lower() - content_type_l = content_type.lower() - return ( - "/aud/" in url_l or - "/audio/" in url_l or - "mp4a" in url_l or - ("audio/" in content_type_l and "video/" not in content_type_l) - ) - - def handle_response(response): - nonlocal best_m3u8_url, best_video_mp4_url - try: - url = response.url - if url in seen_urls: - return - seen_urls.add(url) - - url_l = url.lower() - content_type = response.headers.get("content-type", "") - content_type_l = content_type.lower() - - if ".m4s" in url_l: - return - - if ( - ".m3u8" in url_l or - "application/vnd.apple.mpegurl" in content_type_l or - "application/x-mpegurl" in content_type_l - ): - if best_m3u8_url is None: - best_m3u8_url = url - logging.info(f"πŸ“Ί Found HLS playlist URL: {url}") - return - - if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l: - if is_audio_only_mp4(url, content_type): - logging.info(f"πŸ”‡ Ignoring audio-only MP4: {url}") - return - - if best_video_mp4_url is None: - best_video_mp4_url = url - logging.info(f"πŸŽ₯ Found VIDEO MP4 URL: {url}") - return - - except Exception as e: - logging.debug(f"Response parsing error: {e}") - - page.on("response", handle_response) - - def current_best(): - return best_m3u8_url or best_video_mp4_url - - try: - logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") - page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) - time.sleep(3) - - player = page.locator('[data-testid="videoPlayer"]').first - - if player.count() > 0: - try: - player.scroll_into_view_if_needed(timeout=5000) - except Exception: - pass - - try: - player.click(force=True, timeout=5000) - logging.info("▢️ Clicked video player") - except Exception as e: - logging.info(f"⚠️ First player click failed: {e}") - else: - logging.warning("⚠️ No video player locator found on tweet page") - - for _ in range(12): - if current_best(): - break - time.sleep(1) - - if not current_best() and player.count() > 0: - logging.info("πŸ” No media URL found yet, retrying player interaction...") - try: - player.click(force=True, timeout=5000) - time.sleep(2) - except Exception as e: - logging.info(f"⚠️ Retry click failed: {e}") - - try: - page.keyboard.press("Space") - time.sleep(1) - except Exception: - pass - - for _ in range(8): - if current_best(): - break - time.sleep(1) - - selected_url = current_best() - if selected_url: - logging.info(f"βœ… Selected media URL for download: {selected_url}") - else: - logging.warning(f"⚠️ No playable media URL detected on tweet page: {tweet_url}") - - return selected_url - - except Exception as e: - logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}") - return None - finally: - page.close() - - -# --- Video Processing --- -def download_and_crop_video(video_url, output_path): - temp_input = output_path.replace(".mp4", "_source.mp4") - temp_output = output_path.replace(".mp4", "_cropped.mp4") - - try: - logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}") - - video_url_l = video_url.lower() - - if ".m3u8" in video_url_l: - logging.info("πŸ“Ί Using HLS ffmpeg mode") - download_cmd = [ - "ffmpeg", - "-y", - "-protocol_whitelist", "file,http,https,tcp,tls,crypto", - "-allowed_extensions", "ALL", - "-i", video_url, - "-c", "copy", - temp_input, - ] - else: - logging.info("πŸŽ₯ Using direct MP4 ffmpeg mode") - download_cmd = [ - "ffmpeg", - "-y", - "-i", video_url, - "-c", "copy", - temp_input, - ] - - download_result = subprocess.run( - download_cmd, - capture_output=True, - text=True - ) - - if download_result.returncode != 0: - logging.error(f"❌ ffmpeg download failed:\n{download_result.stderr}") - return None - - if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: - logging.error("❌ Downloaded video source file is missing or empty.") - return None - - logging.info(f"βœ… Video downloaded: {temp_input}") - - video_clip = VideoFileClip(temp_input) - duration = float(video_clip.duration) if video_clip.duration else 0 - - if duration <= 0: - video_clip.close() - logging.error("❌ Downloaded video has invalid or unknown duration.") - return None - - end_time = min(59, duration) - - if hasattr(video_clip, "subclipped"): - cropped_clip = video_clip.subclipped(0, end_time) - else: - cropped_clip = video_clip.subclip(0, end_time) - - cropped_clip.write_videofile( - temp_output, - codec="libx264", - audio_codec="aac", - logger=None - ) - - video_clip.close() - cropped_clip.close() - - if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: - logging.error("❌ Cropped video output is missing or empty.") - return None - - os.replace(temp_output, output_path) - logging.info(f"βœ… Video cropped to 59 seconds: {output_path}") - return output_path - - except Exception as e: - logging.error(f"❌ Error processing video: {e}") - return None - - finally: - for path in [temp_input, temp_output]: - if os.path.exists(path): - try: - os.remove(path) - except Exception: - pass - - -def candidate_matches_existing(candidate, recent_bsky_posts): - """ - Multi-signal dedupe: - 1. canonical tweet URL - 2. text + media fingerprint - 3. normalized text only - """ - candidate_url = candidate["canonical_tweet_url"] - candidate_text_media_key = candidate["text_media_key"] - candidate_normalized_text = candidate["normalized_text"] - - for existing in recent_bsky_posts: - existing_urls = existing["canonical_urls"] - - if candidate_url and candidate_url in existing_urls: - return True, "tweet_url" - - if candidate_text_media_key == existing["text_media_key"]: - return True, "text_media_fingerprint" - - if candidate_normalized_text == existing["normalized_text"]: - return True, "normalized_text" - - return False, None - - -# --- Main Sync Function --- -def sync_feeds(args): - logging.info("πŸ”„ Starting sync cycle...") - try: - tweets = scrape_tweets_via_playwright( - args.twitter_username, - args.twitter_password, - args.twitter_email, - args.twitter_handle - ) - - if not tweets: - logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") - return - - bsky_client = Client() - bsky_client.login(args.bsky_handle, args.bsky_password) - - recent_bsky_posts = get_recent_bsky_posts( - bsky_client, - args.bsky_handle, - limit=DEDUPE_BSKY_LIMIT - ) - - logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for advanced duplicate detection.") - - too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) - logging.info(f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}") - - candidate_tweets = [] - - for tweet in reversed(tweets): - try: - tweet_time = arrow.get(tweet.created_on) - - if tweet_time < too_old_cutoff: - logging.info(f"⏭️ Skipping old tweet from {tweet_time}") - continue - - prepared_text = prepare_post_text(tweet.text, tweet.tweet_url) - normalized_text = normalize_post_text(prepared_text) - - if not normalized_text: - logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}") - continue - - media_fingerprint = build_media_fingerprint(tweet) - text_media_key = build_text_media_key(normalized_text, media_fingerprint) - canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url) - - candidate_tweets.append({ - "tweet": tweet, - "tweet_time": tweet_time, - "raw_text": prepared_text, - "normalized_text": normalized_text, - "media_fingerprint": media_fingerprint, - "text_media_key": text_media_key, - "canonical_tweet_url": canonical_tweet_url, - }) - - except Exception as e: - logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}") - - logging.info(f"πŸ§ͺ Prepared {len(candidate_tweets)} candidate tweets for advanced dedupe comparison.") - - tweets_to_post = [] - for candidate in candidate_tweets: - is_dup, reason = candidate_matches_existing(candidate, recent_bsky_posts) - if is_dup: - logging.info(f"⏭️ Skipping candidate due to duplicate match on: {reason}") - continue - tweets_to_post.append(candidate) - - logging.info(f"πŸ“¬ {len(tweets_to_post)} tweets remain after advanced duplicate filtering.") - - if not tweets_to_post: - logging.info("βœ… No new tweets need posting after duplicate comparison.") - return - - new_posts = 0 - state_file = "twitter_browser_state.json" - - with sync_playwright() as p: - browser = p.chromium.launch( - headless=True, - args=["--disable-blink-features=AutomationControlled"] - ) - context_kwargs = { - "user_agent": ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/145.0.7632.6 Safari/537.36" - ), - "viewport": {"width": 1920, "height": 1080}, - } - if os.path.exists(state_file): - context_kwargs["storage_state"] = state_file - - context = browser.new_context(**context_kwargs) - - for candidate in tweets_to_post: - tweet = candidate["tweet"] - tweet_time = candidate["tweet_time"] - raw_text = candidate["raw_text"] - - logging.info(f"πŸ“ Posting missing tweet from {tweet_time} to Bluesky...") - - rich_text = make_rich(raw_text) - dynamic_alt = build_dynamic_alt(raw_text) - - image_embeds = [] - video_embed = None - - if tweet.media: - for media in tweet.media: - if media.type == "photo": - blob = get_blob_from_url(media.media_url_https, bsky_client) - if blob: - image_embeds.append( - models.AppBskyEmbedImages.Image( - alt=dynamic_alt, - image=blob - ) - ) - - elif media.type == "video": - if not tweet.tweet_url: - logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.") - continue - - temp_video_path = "temp_video.mp4" - - try: - real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) - if not real_video_url: - logging.warning(f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}") - continue - - cropped_video_path = download_and_crop_video(real_video_url, temp_video_path) - if not cropped_video_path: - logging.warning(f"⚠️ Video download/crop failed for {tweet.tweet_url}") - continue - - video_blob = get_blob_from_file(cropped_video_path, bsky_client) - if not video_blob: - logging.warning(f"⚠️ Video upload blob failed for {tweet.tweet_url}") - continue - - video_embed = build_video_embed(video_blob, dynamic_alt) - - finally: - if os.path.exists(temp_video_path): - os.remove(temp_video_path) - - try: - if video_embed: - bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"]) - elif image_embeds: - embed = models.AppBskyEmbedImages.Main(images=image_embeds) - bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"]) - else: - bsky_client.send_post(text=rich_text, langs=["ca"]) - - recent_bsky_posts.insert(0, { - "uri": None, - "text": raw_text, - "normalized_text": candidate["normalized_text"], - "canonical_urls": {candidate["canonical_tweet_url"]} if candidate["canonical_tweet_url"] else set(), - "media_fingerprint": candidate["media_fingerprint"], - "text_media_key": candidate["text_media_key"], - "created_at": None, - }) - recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] - - new_posts += 1 - logging.info(f"βœ… Posted new tweet to Bluesky: {raw_text}") - time.sleep(5) - - except Exception as e: - logging.error(f"❌ Failed to post tweet to Bluesky: {e}") - - browser.close() - - logging.info(f"βœ… Sync complete. Posted {new_posts} new updates.") - - except Exception as e: - logging.error(f"❌ Error during sync cycle: {e}") - - -# --- Main Execution --- -def main(): - load_dotenv() - - parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") - parser.add_argument("--twitter-username", help="Your Twitter login username") - parser.add_argument("--twitter-password", help="Your Twitter login password") - parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") - parser.add_argument("--twitter-handle", help="The Twitter account to scrape") - parser.add_argument("--bsky-handle", help="Your Bluesky handle") - parser.add_argument("--bsky-password", help="Your Bluesky app password") - - args = parser.parse_args() - - args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") - args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") - args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") - args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") - args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") - args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username - - missing_args = [] - if not args.twitter_username: - missing_args.append("--twitter-username") - if not args.twitter_password: - missing_args.append("--twitter-password") - if not args.bsky_handle: - missing_args.append("--bsky-handle") - if not args.bsky_password: - missing_args.append("--bsky-password") - - if missing_args: - logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}") - return - - logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") - sync_feeds(args) - logging.info("πŸ€– Bot finished.") - - -if __name__ == "__main__": - main()