From 782d1bd1495fec461217a6c8d2d77aec3ef5d749 Mon Sep 17 00:00:00 2001 From: Guillem Hernandez Sola Date: Mon, 30 Mar 2026 17:34:36 +0200 Subject: [PATCH] Added new yml --- twitter2bsky_daemon.py | 659 ++++++++++++++++++++++++----------------- 1 file changed, 382 insertions(+), 277 deletions(-) diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index a73d6f5..eec393b 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -18,19 +18,22 @@ logging.basicConfig( level=logging.INFO, ) -# --- Custom Classes to replace Tweety --- +# --- Custom Classes --- class ScrapedMedia: def __init__(self, url, media_type="photo"): - self.type = media_type # Type can be "photo" or "video" + self.type = media_type self.media_url_https = url + class ScrapedTweet: - def __init__(self, created_on, text, media_urls): + def __init__(self, created_on, text, media_urls, tweet_url=None): self.created_on = created_on self.text = text + self.tweet_url = tweet_url self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] -# --- 1. Playwright Scraping Logic --- + +# --- Helpers --- def take_error_screenshot(page, error_msg): logging.info(f"πŸ“Έ Taking screenshot... Shot: {error_msg}") timestamp = time.strftime("%Y%m%d_%H%M%S") @@ -38,31 +41,163 @@ def take_error_screenshot(page, error_msg): page.screenshot(path=screenshot_name) logging.info(f"πŸ“Έ Screenshot saved as: {screenshot_name}") + +def is_valid_url(url): + try: + response = httpx.head(url, timeout=5, follow_redirects=True) + return response.status_code < 500 + except Exception: + return False + + +def clean_url(url): + trimmed_url = url.strip() + cleaned_url = re.sub(r"\s+", "", trimmed_url) + cleaned_url = re.sub(r"[…\.]+$", "", cleaned_url) + + if is_valid_url(cleaned_url): + return cleaned_url + return None + + +def get_blob_from_url(media_url, client): + """Fetches remote media and uploads it to Bluesky.""" + try: + r = httpx.get(media_url, timeout=30, follow_redirects=True) + if r.status_code == 200: + return client.upload_blob(r.content).blob + except Exception as e: + logging.warning(f"Could not fetch media {media_url}: {e}") + return None + + +def get_blob_from_file(file_path, client): + """Uploads a local file to Bluesky.""" + try: + with open(file_path, "rb") as f: + return client.upload_blob(f.read()).blob + except Exception as e: + logging.warning(f"Could not upload local file {file_path}: {e}") + return None + + +def get_last_bsky(client, handle): + timeline = client.get_author_feed(handle) + for titem in timeline.feed: + if titem.reason is None and getattr(titem.post.record, "reply", None) is None: + return arrow.get(titem.post.record.created_at) + return arrow.get(0) + + +def make_rich(content): + text_builder = client_utils.TextBuilder() + + def repair_url(match): + raw = match.group(0) + + if "\n" not in raw and "\r" not in raw: + return re.sub(r"[…\.]+$", "", raw) + + glued = raw.replace("\n", "").replace("\r", "") + test_url = re.sub(r"[…\.]+$", "", glued) + + if is_valid_url(test_url): + return test_url + + parts = raw.split("\n") + test_part0 = re.sub(r"[…\.]+$", "", parts[0]) + if is_valid_url(test_part0): + return raw + + return test_url + + content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip()) + lines = content.splitlines() + + for line_idx, line in enumerate(lines): + if not line.strip(): + if line_idx < len(lines) - 1: + text_builder.text("\n") + continue + + words = line.split(" ") + for i, word in enumerate(words): + if not word: + if i < len(words) - 1: + text_builder.text(" ") + continue + + if word.startswith("http://") or word.startswith("https://"): + if word.startswith("http://"): + word = word.replace("http://", "https://", 1) + + word = re.sub(r"[…\.]+$", "", word) + clean_url_value = clean_url(word) + + if clean_url_value and is_valid_url(clean_url_value): + text_builder.link(clean_url_value, clean_url_value) + else: + text_builder.text(word) + + elif word.startswith("#"): + clean_tag = word[1:].rstrip(".,;:!?)'\"…") + text_builder.tag(word, clean_tag) + + else: + text_builder.text(word) + + if i < len(words) - 1: + text_builder.text(" ") + + if line_idx < len(lines) - 1: + text_builder.text("\n") + + return text_builder + + +def build_dynamic_alt(raw_text): + dynamic_alt = raw_text.replace("\n", " ").strip() + dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() + + if len(dynamic_alt) > 150: + dynamic_alt = dynamic_alt[:147] + "..." + elif not dynamic_alt: + dynamic_alt = "VΓ­deo o imatge adjunta al tuit" + + return dynamic_alt + + +# --- Playwright Scraping --- def scrape_tweets_via_playwright(username, password, email, target_handle): """Logs in (or loads session) and scrapes tweets directly from the DOM.""" tweets = [] state_file = "twitter_browser_state.json" - + with sync_playwright() as p: - browser = p.chromium.launch(headless=True, args=["--disable-blink-features=AutomationControlled"]) - clean_ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.7632.6 Safari/537.36" - + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"] + ) + clean_ua = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ) + context = None needs_login = True - - # 1. Try to load existing session + if os.path.exists(state_file): logging.info("βœ… Found existing browser state. Attempting to bypass login...") context = browser.new_context( user_agent=clean_ua, - viewport={'width': 1920, 'height': 1080}, + viewport={"width": 1920, "height": 1080}, storage_state=state_file ) page = context.new_page() page.goto("https://x.com/home") time.sleep(4) - - # Check if we are actually logged in + if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: logging.info("βœ… Session is valid!") needs_login = False @@ -70,35 +205,37 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): logging.warning("⚠️ Saved session expired or invalid. Re-logging in...") context.close() os.remove(state_file) - - # 2. Perform Login if needed + if needs_login: logging.info("πŸš€ Launching fresh browser for automated Twitter login...") context = browser.new_context( user_agent=clean_ua, - viewport={'width': 1920, 'height': 1080} + viewport={"width": 1920, "height": 1080} ) page = context.new_page() - + try: page.goto("https://x.com") sign_in_button = page.get_by_text("Sign in", exact=True) sign_in_button.wait_for(state="visible", timeout=15000) sign_in_button.click(force=True) - - page.wait_for_selector('h1:has-text("Sign in to X")', state='visible', timeout=25000) + + page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000) logging.info(f"πŸ‘€ Entering username: {username}...") time.sleep(1) - + username_input = page.locator('input[autocomplete="username"]') username_input.wait_for(state="visible", timeout=15000) username_input.click(force=True) - username_input.press_sequentially(username, delay=100) - + username_input.press_sequentially(username, delay=100) + page.locator('button:has-text("Next")').first.click(force=True) - page.wait_for_selector('input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', timeout=15000) + page.wait_for_selector( + 'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', + timeout=15000 + ) time.sleep(1) - + if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): logging.warning("πŸ›‘οΈ Security challenge detected! Entering email/phone...") page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) @@ -113,250 +250,223 @@ def scrape_tweets_via_playwright(username, password, email, target_handle): logging.info("πŸ”‘ Entering password...") page.fill('input[name="password"]', password) page.locator('span:has-text("Log in")').first.click() - + page.wait_for_url("**/home", timeout=20000) time.sleep(3) - - # Save state for next time + context.storage_state(path=state_file) logging.info("βœ… Login successful. Browser state saved.") - + except Exception as e: take_error_screenshot(page, "login_failed") logging.error(f"❌ Login failed: {e}") browser.close() return [] - # 3. Scrape the target profile logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...") + page = context.new_page() page.goto(f"https://x.com/{target_handle}") - + try: - page.wait_for_selector('article', timeout=20000) - time.sleep(3) # Let DOM settle and images load - - articles = page.locator('article').all() + page.wait_for_selector("article", timeout=20000) + time.sleep(3) + + articles = page.locator("article").all() logging.info(f"πŸ“Š Found {len(articles)} tweets on screen. Parsing...") - - for article in articles[:10]: # Check top 10 tweets + + for article in articles[:10]: try: - # Get Time - time_el = article.locator('time').first + time_el = article.locator("time").first if not time_el.is_visible(): - continue # Skip ads or invalid articles - created_at = time_el.get_attribute('datetime') - - # Get Text + continue + + created_at = time_el.get_attribute("datetime") + + tweet_url = None + time_link = article.locator("a:has(time)").first + if time_link.is_visible(): + href = time_link.get_attribute("href") + if href: + tweet_url = f"https://x.com{href}" if href.startswith("/") else href + text_locator = article.locator('[data-testid="tweetText"]').first text = text_locator.inner_text() if text_locator.is_visible() else "" - - # Get Media URLs + media_urls = [] + photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() for img in photo_locators: - src = img.get_attribute('src') + src = img.get_attribute("src") if src: - src = re.sub(r'&name=\w+', '&name=large', src) + src = re.sub(r"&name=\w+", "&name=large", src) media_urls.append((src, "photo")) - - # Get Video URLs + video_locators = article.locator('[data-testid="videoPlayer"]').all() - for video in video_locators: - video_url = video.get_attribute('src') - if video_url: - media_urls.append((video_url, "video")) - - tweets.append(ScrapedTweet(created_at, text, media_urls)) - + if video_locators: + media_urls.append((tweet_url or "", "video")) + + tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url)) + except Exception as e: logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") continue - + except Exception as e: take_error_screenshot(page, "scrape_failed") logging.error(f"❌ Failed to scrape profile: {e}") - + browser.close() return tweets -# --- 2. URL Validation Function --- -def is_valid_url(url): + +def extract_video_url_from_tweet_page(context, tweet_url): + """ + Opens a tweet page and captures the first real MP4 video request. + """ + page = context.new_page() + found_video_url = None + + def handle_response(response): + nonlocal found_video_url + try: + url = response.url + content_type = response.headers.get("content-type", "") + + if found_video_url: + return + + if ".mp4" in url or "video/mp4" in content_type: + found_video_url = url + logging.info(f"πŸŽ₯ Found video URL: {url}") + except Exception: + pass + + page.on("response", handle_response) + try: - # Follow redirects and accept standard anti-bot codes (like 403) so we don't discard real news links - response = httpx.head(url, timeout=5, follow_redirects=True) - return response.status_code < 500 - except Exception: - return False + logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}") + page.goto(tweet_url, wait_until="networkidle", timeout=30000) + time.sleep(5) -# --- 3. URL Cleaning Function --- -def clean_url(url): - trimmed_url = url.strip() - cleaned_url = re.sub(r'\s+', '', trimmed_url) - - # Strip trailing ellipsis (unicode '…' or ascii '...') and trailing periods - cleaned_url = re.sub(r'[…\.]+$', '', cleaned_url) - - if is_valid_url(cleaned_url): - return cleaned_url - return None + video_player = page.locator('[data-testid="videoPlayer"]').first + if video_player.count() > 0: + try: + video_player.click(force=True, timeout=3000) + time.sleep(3) + except Exception: + pass -# --- 4. Video Processing --- + return found_video_url + + except Exception as e: + logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}") + return None + finally: + page.close() + + +# --- Video Processing --- def download_and_crop_video(video_url, output_path): - """Downloads the video and crops it to 59 seconds.""" + """Downloads the video and crops it to max 59 seconds.""" try: - # Download the video - response = httpx.get(video_url, timeout=10) - if response.status_code == 200: - with open(output_path, 'wb') as f: - f.write(response.content) - logging.info(f"βœ… Video downloaded: {output_path}") - - # Crop the video to 59 seconds - video_clip = VideoFileClip(output_path) - cropped_clip = video_clip.subclip(0, min(59, video_clip.duration)) - cropped_clip.write_videofile(output_path, codec='libx264') - logging.info(f"βœ… Video cropped to 59 seconds: {output_path}") - return output_path - else: - logging.error(f"❌ Failed to download video: {video_url}") + response = httpx.get(video_url, timeout=60, follow_redirects=True) + if response.status_code != 200: + logging.error(f"❌ Failed to download video: {video_url} (status {response.status_code})") return None + + with open(output_path, "wb") as f: + f.write(response.content) + logging.info(f"βœ… Video downloaded: {output_path}") + + video_clip = VideoFileClip(output_path) + end_time = min(59, float(video_clip.duration)) + + if hasattr(video_clip, "subclipped"): + cropped_clip = video_clip.subclipped(0, end_time) + else: + cropped_clip = video_clip.subclip(0, end_time) + + temp_output = output_path.replace(".mp4", "_cropped.mp4") + cropped_clip.write_videofile( + temp_output, + codec="libx264", + audio_codec="aac", + logger=None + ) + + video_clip.close() + cropped_clip.close() + + os.replace(temp_output, output_path) + logging.info(f"βœ… Video cropped to 59 seconds: {output_path}") + return output_path + except Exception as e: logging.error(f"❌ Error processing video: {e}") return None -# --- 5. Formatting & Bluesky Logic --- -def get_last_bsky(client, handle): - timeline = client.get_author_feed(handle) - for titem in timeline.feed: - if titem.reason is None and getattr(titem.post.record, "reply", None) is None: - return arrow.get(titem.post.record.created_at) - return arrow.get(0) -def make_rich(content): - text_builder = client_utils.TextBuilder() - - # 1. Smart URL Repair: Find URLs broken by newlines and glue them back together - def repair_url(match): - raw = match.group(0) - - # If there are no newlines, just strip trailing ellipsis and return - if '\n' not in raw and '\r' not in raw: - return re.sub(r'[…\.]+$', '', raw) - - # Try removing all newlines - glued = raw.replace('\n', '').replace('\r', '') - - # Strip trailing ellipsis for validation - test_url = re.sub(r'[…\.]+$', '', glued) - - # If gluing it creates a valid URL, it was definitely broken by Twitter's DOM - if is_valid_url(test_url): - return test_url - - # If it's NOT a valid URL, maybe the newline was supposed to be there (e.g., URL\nNextParagraph) - parts = raw.split('\n') - test_part0 = re.sub(r'[…\.]+$', '', parts[0]) - if is_valid_url(test_part0): - return raw # Return original to preserve the paragraph break - - # Fallback: assume it's a broken URL and glue it anyway - return test_url - - # This regex grabs http:// or https:// followed by any non-space characters (including newlines) - content = re.sub(r'https?://[^\ \t]+', repair_url, content.strip()) - - # 2. Split content into lines to preserve actual paragraph breaks - lines = content.splitlines() - - for line_idx, line in enumerate(lines): - # Handle empty lines to preserve spacing - if not line.strip(): - if line_idx < len(lines) - 1: - text_builder.text("\n") - continue - - # Split by space to process words, URLs, and tags - words = line.split(" ") - for i, word in enumerate(words): - if not word: # Handle double spaces gracefully - if i < len(words) - 1: - text_builder.text(" ") - continue - - # Check and convert URLs - if word.startswith("http://") or word.startswith("https://"): - # Ensure the URL is converted to https if it starts with http - if word.startswith("http://"): - word = word.replace("http://", "https://", 1) - - # Strip trailing ellipsis right here so it doesn't fail validation - word = re.sub(r'[…\.]+$', '', word) - - # Clean the URL further using the clean_url function - clean_url_value = clean_url(word) - if clean_url_value and is_valid_url(clean_url_value): - text_builder.link(clean_url_value, clean_url_value) - else: - text_builder.text(word) # Add as plain text if invalid - elif word.startswith("#"): - clean_tag = word[1:].rstrip(".,;:!?)'\"…") - text_builder.tag(word, clean_tag) - else: - text_builder.text(word) - - if i < len(words) - 1: - text_builder.text(" ") # Add space between words - - # Add a line break after each processed line, except the very last one - if line_idx < len(lines) - 1: - text_builder.text("\n") - - return text_builder - -def get_blob_from_url(media_url, client): - """Fetches and uploads the media (image or video) and returns the blob.""" +def build_video_embed(video_blob, alt_text): try: - r = httpx.get(media_url, timeout=10) - if r.status_code == 200: - return client.upload_blob(r.content).blob - except Exception as e: - logging.warning(f"Could not fetch media {media_url}: {e}") - return None + return models.AppBskyEmbedVideo.Main( + video=video_blob, + alt=alt_text + ) + except AttributeError: + logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") + return None -# --- 6. Main Sync Function --- + +# --- Main Sync Function --- def sync_feeds(args): logging.info("πŸ”„ Starting sync cycle...") try: - # 1. Fetch Tweets via Playwright tweets = scrape_tweets_via_playwright( - args.twitter_username, - args.twitter_password, - args.twitter_email, + args.twitter_username, + args.twitter_password, + args.twitter_email, args.twitter_handle ) - + if not tweets: logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") return - - # 2. Connect to Bluesky + bsky_client = Client() bsky_client.login(args.bsky_handle, args.bsky_password) last_bsky_time = get_last_bsky(bsky_client, args.bsky_handle) - - # 3. Process and Post - new_posts = 0 - for tweet in reversed(tweets): - tweet_time = arrow.get(tweet.created_on) - - if tweet_time > last_bsky_time: # Only post new tweets - #if True: # For testing, post all tweets regardless of time + new_posts = 0 + state_file = "twitter_browser_state.json" + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"] + ) + context_kwargs = { + "user_agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + "viewport": {"width": 1920, "height": 1080}, + } + if os.path.exists(state_file): + context_kwargs["storage_state"] = state_file + + context = browser.new_context(**context_kwargs) + + for tweet in reversed(tweets): + tweet_time = arrow.get(tweet.created_on) + + if tweet_time <= last_bsky_time: + continue + logging.info(f"πŸ“ Found new tweet from {tweet_time}. Posting to Bluesky...") - + raw_text = tweet.text.strip() - - # Smart truncation: Don't cut off in the middle of a word/URL + if len(raw_text) > 295: truncated = raw_text[:290] last_space = truncated.rfind(" ") @@ -365,113 +475,107 @@ def sync_feeds(args): else: raw_text = truncated + "..." logging.info("βœ‚οΈ Tweet exceeded 300 characters. Truncated safely for Bluesky.") - - # Generate rich_text (This is now a TextBuilder object) + rich_text = make_rich(raw_text) + dynamic_alt = build_dynamic_alt(raw_text) + + image_embeds = [] + video_embed = None - images = [] if tweet.media: - # --- NEW: Generate dynamic alt text based on the tweet content --- - # 1. Remove line breaks and extra spaces - dynamic_alt = raw_text.replace('\n', ' ').strip() - # 2. Remove URLs to keep the text clean - dynamic_alt = re.sub(r'https?://\S+', '', dynamic_alt).strip() - - # 3. Truncate gracefully if it's too long - if len(dynamic_alt) > 150: - dynamic_alt = dynamic_alt[:147] + "..." - elif not dynamic_alt: - # Fallback if the tweet is literally just an image with no text - dynamic_alt = "Imatge adjunta al tuit" - # ----------------------------------------------------------------- - for media in tweet.media: if media.type == "photo": blob = get_blob_from_url(media.media_url_https, bsky_client) if blob: - # Inject our dynamic alt text here! - images.append(models.AppBskyEmbedImages.Image(alt=dynamic_alt, image=blob)) - elif media.type == "video": - # Download and crop the video - video_path = "temp_video.mp4" - cropped_video_path = download_and_crop_video(media.media_url_https, video_path) - if cropped_video_path: - blob = get_blob_from_url(cropped_video_path, bsky_client) - if blob: - images.append(models.AppBskyEmbedImages.Image(alt=dynamic_alt, image=blob)) - os.remove(video_path) # Clean up the temporary video file + image_embeds.append( + models.AppBskyEmbedImages.Image( + alt=dynamic_alt, + image=blob + ) + ) + + elif media.type == "video": + if not tweet.tweet_url: + logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.") + continue + + temp_video_path = "temp_video.mp4" + + try: + real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) + if not real_video_url: + logging.warning(f"⚠️ Could not resolve real video URL for {tweet.tweet_url}") + continue + + cropped_video_path = download_and_crop_video(real_video_url, temp_video_path) + if not cropped_video_path: + continue + + video_blob = get_blob_from_file(cropped_video_path, bsky_client) + if not video_blob: + continue + + video_embed = build_video_embed(video_blob, dynamic_alt) + + finally: + if os.path.exists(temp_video_path): + os.remove(temp_video_path) - # 🌐 Posting with Catalan language tag try: - if images: - embed = models.AppBskyEmbedImages.Main(images=images) + if video_embed: + bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"]) + elif image_embeds: + embed = models.AppBskyEmbedImages.Main(images=image_embeds) bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"]) else: bsky_client.send_post(text=rich_text, langs=["ca"]) - + new_posts += 1 logging.info(f"βœ… Posted new tweet to Bluesky: {raw_text}") - time.sleep(5) # Rate limit handling + time.sleep(5) + except Exception as e: logging.error(f"❌ Failed to post tweet to Bluesky: {e}") - + + browser.close() + logging.info(f"βœ… Sync complete. Posted {new_posts} new updates.") - + except Exception as e: logging.error(f"❌ Error during sync cycle: {e}") -# --- 7. Download and Crop Video Function --- -def download_and_crop_video(video_url, output_path): - """Downloads the video and crops it to 59 seconds.""" - try: - # Download the video - response = httpx.get(video_url, timeout=10) - if response.status_code == 200: - with open(output_path, 'wb') as f: - f.write(response.content) - logging.info(f"βœ… Video downloaded: {output_path}") - # Crop the video to 59 seconds - video_clip = VideoFileClip(output_path) - cropped_clip = video_clip.subclip(0, min(59, video_clip.duration)) - cropped_clip.write_videofile(output_path, codec='libx264') - logging.info(f"βœ… Video cropped to 59 seconds: {output_path}") - return output_path - else: - logging.error(f"❌ Failed to download video: {video_url}") - return None - except Exception as e: - logging.error(f"❌ Error processing video: {e}") - return None - -# --- 8. Main Execution --- +# --- Main Execution --- def main(): load_dotenv() - + parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") - + parser.add_argument("--twitter-username", help="Your Twitter login username") parser.add_argument("--twitter-password", help="Your Twitter login password") parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") parser.add_argument("--twitter-handle", help="The Twitter account to scrape") parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") - + args = parser.parse_args() - + args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") - args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username - + missing_args = [] - if not args.twitter_username: missing_args.append("--twitter-username") - if not args.twitter_password: missing_args.append("--twitter-password") - if not args.bsky_handle: missing_args.append("--bsky-handle") - if not args.bsky_password: missing_args.append("--bsky-password") + if not args.twitter_username: + missing_args.append("--twitter-username") + if not args.twitter_password: + missing_args.append("--twitter-password") + if not args.bsky_handle: + missing_args.append("--bsky-handle") + if not args.bsky_password: + missing_args.append("--bsky-password") if missing_args: logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}") @@ -479,7 +583,8 @@ def main(): logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") sync_feeds(args) - logging.info(f"πŸ€– Bot finished.") + logging.info("πŸ€– Bot finished.") + if __name__ == "__main__": main() \ No newline at end of file