import argparse import arrow import logging import re import httpx import time import os from dotenv import load_dotenv from atproto import Client, client_utils, models from playwright.sync_api import sync_playwright from moviepy import VideoFileClip # --- Logging Setup --- LOG_PATH = "twitter2bsky.log" logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], level=logging.INFO, ) # --- Custom Classes to replace Tweety --- class ScrapedMedia: def __init__(self, url, media_type="photo"): self.type = media_type # Type can be "photo" or "video" self.media_url_https = url class ScrapedTweet: def __init__(self, created_on, text, media_urls): self.created_on = created_on self.text = text self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] # --- 1. Playwright Scraping Logic --- def take_error_screenshot(page, error_msg): logging.info(f"📸 Taking screenshot... Shot: {error_msg}") timestamp = time.strftime("%Y%m%d_%H%M%S") screenshot_name = f"screenshot_{timestamp}.png" page.screenshot(path=screenshot_name) logging.info(f"📸 Screenshot saved as: {screenshot_name}") def scrape_tweets_via_playwright(username, password, email, target_handle): """Logs in (or loads session) and scrapes tweets directly from the DOM.""" tweets = [] state_file = "twitter_browser_state.json" with sync_playwright() as p: browser = p.chromium.launch(headless=True, args=["--disable-blink-features=AutomationControlled"]) clean_ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.7632.6 Safari/537.36" context = None needs_login = True # 1. Try to load existing session if os.path.exists(state_file): logging.info("✅ Found existing browser state. Attempting to bypass login...") context = browser.new_context( user_agent=clean_ua, viewport={'width': 1920, 'height': 1080}, storage_state=state_file ) page = context.new_page() page.goto("https://x.com/home") time.sleep(4) # Check if we are actually logged in if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: logging.info("✅ Session is valid!") needs_login = False else: logging.warning("⚠️ Saved session expired or invalid. Re-logging in...") context.close() os.remove(state_file) # 2. Perform Login if needed if needs_login: logging.info("🚀 Launching fresh browser for automated Twitter login...") context = browser.new_context( user_agent=clean_ua, viewport={'width': 1920, 'height': 1080} ) page = context.new_page() try: page.goto("https://x.com") sign_in_button = page.get_by_text("Sign in", exact=True) sign_in_button.wait_for(state="visible", timeout=15000) sign_in_button.click(force=True) page.wait_for_selector('h1:has-text("Sign in to X")', state='visible', timeout=25000) logging.info(f"👤 Entering username: {username}...") time.sleep(1) username_input = page.locator('input[autocomplete="username"]') username_input.wait_for(state="visible", timeout=15000) username_input.click(force=True) username_input.press_sequentially(username, delay=100) page.locator('button:has-text("Next")').first.click(force=True) page.wait_for_selector('input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', timeout=15000) time.sleep(1) if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): logging.warning("🛡️ Security challenge detected! Entering email/phone...") page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first if sec_next.is_visible(): sec_next.click(force=True) else: page.keyboard.press("Enter") page.wait_for_selector('input[name="password"]', timeout=15000) time.sleep(1) logging.info("🔑 Entering password...") page.fill('input[name="password"]', password) page.locator('span:has-text("Log in")').first.click() page.wait_for_url("**/home", timeout=20000) time.sleep(3) # Save state for next time context.storage_state(path=state_file) logging.info("✅ Login successful. Browser state saved.") except Exception as e: take_error_screenshot(page, "login_failed") logging.error(f"❌ Login failed: {e}") browser.close() return [] # 3. Scrape the target profile logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...") page.goto(f"https://x.com/{target_handle}") try: page.wait_for_selector('article', timeout=20000) time.sleep(3) # Let DOM settle and images load articles = page.locator('article').all() logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing...") for article in articles[:10]: # Check top 10 tweets try: # Get Time time_el = article.locator('time').first if not time_el.is_visible(): continue # Skip ads or invalid articles created_at = time_el.get_attribute('datetime') # Get Text text_locator = article.locator('[data-testid="tweetText"]').first text = text_locator.inner_text() if text_locator.is_visible() else "" # Get Media URLs media_urls = [] photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() for img in photo_locators: src = img.get_attribute('src') if src: src = re.sub(r'&name=\w+', '&name=large', src) media_urls.append((src, "photo")) # Get Video URLs video_locators = article.locator('[data-testid="videoPlayer"]').all() for video in video_locators: video_url = video.get_attribute('src') if video_url: media_urls.append((video_url, "video")) tweets.append(ScrapedTweet(created_at, text, media_urls)) except Exception as e: logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") continue except Exception as e: take_error_screenshot(page, "scrape_failed") logging.error(f"❌ Failed to scrape profile: {e}") browser.close() return tweets # --- 2. URL Validation Function --- def is_valid_url(url): try: # Follow redirects and accept standard anti-bot codes (like 403) so we don't discard real news links response = httpx.head(url, timeout=5, follow_redirects=True) return response.status_code < 500 except Exception: return False # --- 3. URL Cleaning Function --- def clean_url(url): trimmed_url = url.strip() cleaned_url = re.sub(r'\s+', '', trimmed_url) # Strip trailing ellipsis (unicode '…' or ascii '...') and trailing periods cleaned_url = re.sub(r'[…\.]+$', '', cleaned_url) if is_valid_url(cleaned_url): return cleaned_url return None # --- 4. Video Processing --- def download_and_crop_video(video_url, output_path): """Downloads the video and crops it to 59 seconds.""" try: # Download the video response = httpx.get(video_url, timeout=10) if response.status_code == 200: with open(output_path, 'wb') as f: f.write(response.content) logging.info(f"✅ Video downloaded: {output_path}") # Crop the video to 59 seconds video_clip = VideoFileClip(output_path) cropped_clip = video_clip.subclip(0, min(59, video_clip.duration)) cropped_clip.write_videofile(output_path, codec='libx264') logging.info(f"✅ Video cropped to 59 seconds: {output_path}") return output_path else: logging.error(f"❌ Failed to download video: {video_url}") return None except Exception as e: logging.error(f"❌ Error processing video: {e}") return None # --- 5. Formatting & Bluesky Logic --- def get_last_bsky(client, handle): timeline = client.get_author_feed(handle) for titem in timeline.feed: if titem.reason is None and getattr(titem.post.record, "reply", None) is None: return arrow.get(titem.post.record.created_at) return arrow.get(0) def make_rich(content): text_builder = client_utils.TextBuilder() # 1. Smart URL Repair: Find URLs broken by newlines and glue them back together def repair_url(match): raw = match.group(0) # If there are no newlines, just strip trailing ellipsis and return if '\n' not in raw and '\r' not in raw: return re.sub(r'[…\.]+$', '', raw) # Try removing all newlines glued = raw.replace('\n', '').replace('\r', '') # Strip trailing ellipsis for validation test_url = re.sub(r'[…\.]+$', '', glued) # If gluing it creates a valid URL, it was definitely broken by Twitter's DOM if is_valid_url(test_url): return test_url # If it's NOT a valid URL, maybe the newline was supposed to be there (e.g., URL\nNextParagraph) parts = raw.split('\n') test_part0 = re.sub(r'[…\.]+$', '', parts[0]) if is_valid_url(test_part0): return raw # Return original to preserve the paragraph break # Fallback: assume it's a broken URL and glue it anyway return test_url # This regex grabs http:// or https:// followed by any non-space characters (including newlines) content = re.sub(r'https?://[^\ \t]+', repair_url, content.strip()) # 2. Split content into lines to preserve actual paragraph breaks lines = content.splitlines() for line_idx, line in enumerate(lines): # Handle empty lines to preserve spacing if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue # Split by space to process words, URLs, and tags words = line.split(" ") for i, word in enumerate(words): if not word: # Handle double spaces gracefully if i < len(words) - 1: text_builder.text(" ") continue # Check and convert URLs if word.startswith("http://") or word.startswith("https://"): # Ensure the URL is converted to https if it starts with http if word.startswith("http://"): word = word.replace("http://", "https://", 1) # Strip trailing ellipsis right here so it doesn't fail validation word = re.sub(r'[…\.]+$', '', word) # Clean the URL further using the clean_url function clean_url_value = clean_url(word) if clean_url_value and is_valid_url(clean_url_value): text_builder.link(clean_url_value, clean_url_value) else: text_builder.text(word) # Add as plain text if invalid elif word.startswith("#"): clean_tag = word[1:].rstrip(".,;:!?)'\"…") text_builder.tag(word, clean_tag) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") # Add space between words # Add a line break after each processed line, except the very last one if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder def get_blob_from_url(media_url, client): """Fetches and uploads the media (image or video) and returns the blob.""" try: r = httpx.get(media_url, timeout=10) if r.status_code == 200: return client.upload_blob(r.content).blob except Exception as e: logging.warning(f"Could not fetch media {media_url}: {e}") return None # --- 6. Main Sync Function --- def sync_feeds(args): logging.info("🔄 Starting sync cycle...") try: # 1. Fetch Tweets via Playwright tweets = scrape_tweets_via_playwright( args.twitter_username, args.twitter_password, args.twitter_email, args.twitter_handle ) if not tweets: logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") return # 2. Connect to Bluesky bsky_client = Client() bsky_client.login(args.bsky_handle, args.bsky_password) last_bsky_time = get_last_bsky(bsky_client, args.bsky_handle) # 3. Process and Post new_posts = 0 for tweet in reversed(tweets): tweet_time = arrow.get(tweet.created_on) #if tweet_time > last_bsky_time: # Only post new tweets if True: # For testing, post all tweets regardless of time logging.info(f"📝 Found new tweet from {tweet_time}. Posting to Bluesky...") raw_text = tweet.text.strip() # Smart truncation: Don't cut off in the middle of a word/URL if len(raw_text) > 295: truncated = raw_text[:290] last_space = truncated.rfind(" ") if last_space > 0: raw_text = truncated[:last_space] + "..." else: raw_text = truncated + "..." logging.info("✂️ Tweet exceeded 300 characters. Truncated safely for Bluesky.") # Generate rich_text (This is now a TextBuilder object) rich_text = make_rich(raw_text) images = [] if tweet.media: # --- NEW: Generate dynamic alt text based on the tweet content --- # 1. Remove line breaks and extra spaces dynamic_alt = raw_text.replace('\n', ' ').strip() # 2. Remove URLs to keep the text clean dynamic_alt = re.sub(r'https?://\S+', '', dynamic_alt).strip() # 3. Truncate gracefully if it's too long if len(dynamic_alt) > 150: dynamic_alt = dynamic_alt[:147] + "..." elif not dynamic_alt: # Fallback if the tweet is literally just an image with no text dynamic_alt = "Imatge adjunta al tuit" # ----------------------------------------------------------------- for media in tweet.media: if media.type == "photo": blob = get_blob_from_url(media.media_url_https, bsky_client) if blob: # Inject our dynamic alt text here! images.append(models.AppBskyEmbedImages.Image(alt=dynamic_alt, image=blob)) elif media.type == "video": # Download and crop the video video_path = "temp_video.mp4" cropped_video_path = download_and_crop_video(media.media_url_https, video_path) if cropped_video_path: blob = get_blob_from_url(cropped_video_path, bsky_client) if blob: images.append(models.AppBskyEmbedImages.Image(alt=dynamic_alt, image=blob)) os.remove(video_path) # Clean up the temporary video file # 🌐 Posting with Catalan language tag try: if images: embed = models.AppBskyEmbedImages.Main(images=images) bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"]) else: bsky_client.send_post(text=rich_text, langs=["ca"]) new_posts += 1 logging.info(f"✅ Posted new tweet to Bluesky: {raw_text}") time.sleep(5) # Rate limit handling except Exception as e: logging.error(f"❌ Failed to post tweet to Bluesky: {e}") logging.info(f"✅ Sync complete. Posted {new_posts} new updates.") except Exception as e: logging.error(f"❌ Error during sync cycle: {e}") # --- 7. Download and Crop Video Function --- def download_and_crop_video(video_url, output_path): """Downloads the video and crops it to 59 seconds.""" try: # Download the video response = httpx.get(video_url, timeout=10) if response.status_code == 200: with open(output_path, 'wb') as f: f.write(response.content) logging.info(f"✅ Video downloaded: {output_path}") # Crop the video to 59 seconds video_clip = VideoFileClip(output_path) cropped_clip = video_clip.subclip(0, min(59, video_clip.duration)) cropped_clip.write_videofile(output_path, codec='libx264') logging.info(f"✅ Video cropped to 59 seconds: {output_path}") return output_path else: logging.error(f"❌ Failed to download video: {video_url}") return None except Exception as e: logging.error(f"❌ Error processing video: {e}") return None # --- 8. Main Execution --- def main(): load_dotenv() parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") parser.add_argument("--twitter-username", help="Your Twitter login username") parser.add_argument("--twitter-password", help="Your Twitter login password") parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") parser.add_argument("--twitter-handle", help="The Twitter account to scrape") parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") args = parser.parse_args() args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username missing_args = [] if not args.twitter_username: missing_args.append("--twitter-username") if not args.twitter_password: missing_args.append("--twitter-password") if not args.bsky_handle: missing_args.append("--bsky-handle") if not args.bsky_password: missing_args.append("--bsky-password") if missing_args: logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}") return logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}") sync_feeds(args) logging.info(f"🤖 Bot finished.") if __name__ == "__main__": main()