import argparse import arrow import hashlib import json import logging import re import httpx import time import os import subprocess from urllib.parse import urlparse from dotenv import load_dotenv from atproto import Client, client_utils, models from playwright.sync_api import sync_playwright from moviepy import VideoFileClip from bs4 import BeautifulSoup # --- Configuration --- LOG_PATH = "twitter2bsky.log" STATE_PATH = "twitter2bsky_state.json" SCRAPE_TWEET_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30 TWEET_MAX_AGE_DAYS = 3 BSKY_TEXT_MAX_LENGTH = 275 # Video handling notes: # - Bluesky video support is constrained not just by duration, but also by # practical upload limits like final file size, bitrate, resolution, and # server-side proxy/PDS body-size caps. # - Custom PDSes such as eurosky.social may accept images fine but fail on # larger video blob uploads. # - The safest approach is to: # 1. cap duration # 2. compress aggressively # 3. log final file size # 4. skip obviously too-large uploads VIDEO_MAX_DURATION_SECONDS = 179 MAX_VIDEO_UPLOAD_SIZE_MB = 45 BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 BSKY_BLOB_UPLOAD_BASE_DELAY = 10 BSKY_BLOB_UPLOAD_MAX_DELAY = 300 MEDIA_DOWNLOAD_TIMEOUT = 30 LINK_METADATA_TIMEOUT = 10 DEFAULT_BSKY_BASE_URL = "https://bsky.social" # --- Logging Setup --- logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], level=logging.INFO, ) # --- Custom Classes --- class ScrapedMedia: def __init__(self, url, media_type="photo"): self.type = media_type self.media_url_https = url class ScrapedTweet: def __init__(self, created_on, text, media_urls, tweet_url=None): self.created_on = created_on self.text = text self.tweet_url = tweet_url self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] # --- Helpers --- def take_error_screenshot(page, error_msg): logging.info(f"๐Ÿ“ธ Taking screenshot... Shot: {error_msg}") timestamp = time.strftime("%Y%m%d_%H%M%S") screenshot_name = f"screenshot_{timestamp}.png" page.screenshot(path=screenshot_name) logging.info(f"๐Ÿ“ธ Screenshot saved as: {screenshot_name}") def is_valid_url(url): try: response = httpx.head(url, timeout=5, follow_redirects=True) return response.status_code < 500 except Exception: return False def clean_url(url): trimmed_url = url.strip() cleaned_url = re.sub(r"\s+", "", trimmed_url) cleaned_url = re.sub(r"[โ€ฆ\.]+$", "", cleaned_url) if is_valid_url(cleaned_url): return cleaned_url return None def canonicalize_url(url): if not url: return None return url.strip() def canonicalize_tweet_url(url): """ Canonicalize x.com/twitter.com status URLs for internal dedupe only. """ if not url: return None url = url.strip() match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE) if not match: return url.lower() handle = match.group(1).lower() tweet_id = match.group(2) return f"https://x.com/{handle}/status/{tweet_id}" def is_x_or_twitter_domain(url): try: hostname = (urlparse(url).hostname or "").lower() return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} except Exception: return False def extract_urls_from_text(text): if not text: return [] return re.findall(r"https?://[^\s]+", text) def extract_non_x_urls_from_text(text): urls = extract_urls_from_text(text) result = [] for url in urls: cleaned = re.sub(r"[โ€ฆ\.]+$", "", url.strip()) if cleaned and not is_x_or_twitter_domain(cleaned): result.append(cleaned) return result def extract_urls_from_facets(record): """ Extract link URLs from Bluesky rich text facets if present. """ urls = [] try: facets = getattr(record, "facets", None) or [] for facet in facets: features = getattr(facet, "features", None) or [] for feature in features: uri = getattr(feature, "uri", None) if uri: urls.append(uri) except Exception as e: logging.debug(f"Could not extract facet URLs: {e}") return urls def get_rate_limit_wait_seconds(error_obj, default_delay): """ Try to extract a sensible wait time from atproto/http error objects. """ try: headers = getattr(error_obj, "headers", None) if headers: reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: now_ts = int(time.time()) reset_ts = int(reset_value) wait_seconds = max(reset_ts - now_ts + 1, default_delay) return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) except Exception: pass return default_delay def upload_blob_with_retry(client, binary_data, media_label="media"): """ Retry Bluesky blob upload when rate-limited. Diagnostic note: On alternate PDSes, large video uploads may fail for reasons other than 429 rate limits. In those cases we log the exception more explicitly and return None so the caller can degrade gracefully. """ last_exception = None for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): try: result = client.upload_blob(binary_data) return result.blob except Exception as e: last_exception = e error_text = str(e) is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text if not is_rate_limited: logging.warning(f"Could not upload {media_label}: {repr(e)}") if hasattr(e, "response") and e.response is not None: try: logging.warning(f"Upload response status: {e.response.status_code}") logging.warning(f"Upload response body: {e.response.text}") except Exception: pass return None backoff_delay = min( BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY ) wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: logging.warning( f"โณ Bluesky blob upload rate-limited for {media_label}. " f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." ) time.sleep(wait_seconds) else: logging.warning( f"โŒ Exhausted blob upload retries for {media_label} after rate limiting: {repr(e)}" ) logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") return None def get_blob_from_url(media_url, client, http_client): """ Download media and upload to Bluesky with retry support for upload rate limits. """ try: r = http_client.get(media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True) if r.status_code != 200: logging.warning(f"Could not fetch media {media_url}: HTTP {r.status_code}") return None content = r.content if not content: logging.warning(f"Could not fetch media {media_url}: empty response body") return None return upload_blob_with_retry(client, content, media_label=media_url) except Exception as e: logging.warning(f"Could not fetch media {media_url}: {repr(e)}") return None def get_blob_from_file(file_path, client): """ Upload a local file as a Bluesky blob. Diagnostic notes: - We log the final file size because this is often the real reason a custom PDS rejects video uploads. - Self-hosted or alternate services may have stricter proxy/body-size limits than bsky.social. """ try: if not os.path.exists(file_path): logging.warning(f"Could not upload local file {file_path}: file does not exist") return None file_size = os.path.getsize(file_path) file_size_mb = file_size / (1024 * 1024) logging.info(f"๐Ÿ“ฆ Uploading local file {file_path} ({file_size_mb:.2f} MB)") if file_path.lower().endswith(".mp4") and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: logging.warning( f"Could not upload local file {file_path}: " f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" ) return None with open(file_path, "rb") as f: binary_data = f.read() return upload_blob_with_retry(client, binary_data, media_label=file_path) except Exception as e: logging.warning(f"Could not upload local file {file_path}: {repr(e)}") if hasattr(e, "response") and e.response is not None: try: logging.warning(f"Upload response status: {e.response.status_code}") logging.warning(f"Upload response body: {e.response.text}") except Exception: pass return None def fetch_link_metadata(url, http_client): """ Fetch metadata used to build a Bluesky external link card. """ try: r = http_client.get(url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") title = (soup.find("meta", property="og:title") or soup.find("title")) desc = ( soup.find("meta", property="og:description") or soup.find("meta", attrs={"name": "description"}) ) image = ( soup.find("meta", property="og:image") or soup.find("meta", attrs={"name": "twitter:image"}) ) return { "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), "description": desc["content"] if desc and desc.has_attr("content") else "", "image": image["content"] if image and image.has_attr("content") else None, } except Exception as e: logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}") return {} def build_external_link_embed(url, client, http_client, fallback_title="Link"): """ Build a Bluesky external embed from a URL. This should only be used when the post has no image/video embed, because Bluesky posts can only carry one embed type. """ link_metadata = fetch_link_metadata(url, http_client) thumb_blob = None if link_metadata.get("image"): thumb_blob = get_blob_from_url(link_metadata["image"], client, http_client) if link_metadata.get("title") or link_metadata.get("description") or thumb_blob: return models.AppBskyEmbedExternal.Main( external=models.AppBskyEmbedExternal.External( uri=url, title=link_metadata.get("title") or fallback_title, description=link_metadata.get("description") or "", thumb=thumb_blob, ) ) return None def prepare_post_text(text): """ Prepare the final public text exactly as it should be posted to Bluesky. Does NOT append the source X URL. Enforces the Bluesky text limit. """ raw_text = (text or "").strip() if len(raw_text) > BSKY_TEXT_MAX_LENGTH: truncated = raw_text[:BSKY_TEXT_MAX_LENGTH - 3] last_space = truncated.rfind(" ") if last_space > 0: raw_text = truncated[:last_space] + "..." else: raw_text = truncated + "..." return raw_text def normalize_post_text(text): """ Normalize post text for duplicate detection. """ if not text: return "" text = text.replace("\r", "\n") text = re.sub(r"\s+", " ", text).strip() return text.lower() def build_media_fingerprint(tweet): """ Build a deterministic media fingerprint from scraped tweet media. """ if not tweet or not tweet.media: return "no-media" parts = [] for media in tweet.media: media_type = getattr(media, "type", "unknown") media_url = getattr(media, "media_url_https", "") or "" stable_value = media_url if media_type == "photo": stable_value = re.sub(r"[?&]name=\w+", "", stable_value) stable_value = re.sub(r"[?&]format=\w+", "", stable_value) elif media_type == "video": stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "") parts.append(f"{media_type}:{stable_value}") parts.sort() raw = "|".join(parts) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def build_bsky_media_fingerprint(post_view): """ Best-effort media fingerprint from Bluesky embed structure. """ try: embed = getattr(post_view, "embed", None) if not embed: return "no-media" parts = [] images = getattr(embed, "images", None) if images: for img in images: image_obj = getattr(img, "image", None) ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj) parts.append(f"photo:{ref}") video = getattr(embed, "video", None) if video: ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) parts.append(f"video:{ref}") external = getattr(embed, "external", None) if external: uri = getattr(external, "uri", None) or str(external) parts.append(f"external:{uri}") if not parts: return "no-media" parts.sort() raw = "|".join(parts) return hashlib.sha256(raw.encode("utf-8")).hexdigest() except Exception as e: logging.debug(f"Could not build Bluesky media fingerprint: {e}") return "no-media" def build_text_media_key(normalized_text, media_fingerprint): return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest() def create_bsky_client(base_url, handle, password): """ Create a Bluesky/ATProto client pointed at the desired PDS or service host. Supports custom hosts like eurosky.social. """ normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") logging.info(f"๐Ÿ” Connecting Bluesky client via base URL: {normalized_base_url}") try: client = Client(base_url=normalized_base_url) except TypeError: logging.warning("โš ๏ธ Your atproto Client does not accept base_url in constructor. Falling back.") client = Client() try: if hasattr(client, "base_url"): client.base_url = normalized_base_url elif hasattr(client, "_base_url"): client._base_url = normalized_base_url except Exception as e: logging.warning(f"โš ๏ธ Could not apply custom base URL cleanly: {e}") client.login(handle, password) return client # --- Local State Management --- def default_state(): return { "version": 1, "posted_tweets": {}, "posted_by_bsky_uri": {}, "updated_at": None, } def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): logging.info(f"๐Ÿง  No state file found at {state_path}. Starting with empty memory.") return default_state() try: with open(state_path, "r", encoding="utf-8") as f: state = json.load(f) if not isinstance(state, dict): logging.warning("โš ๏ธ State file is invalid. Reinitializing.") return default_state() state.setdefault("version", 1) state.setdefault("posted_tweets", {}) state.setdefault("posted_by_bsky_uri", {}) state.setdefault("updated_at", None) return state except Exception as e: logging.warning(f"โš ๏ธ Could not load state file {state_path}: {e}. Reinitializing.") return default_state() def save_state(state, state_path=STATE_PATH): try: state["updated_at"] = arrow.utcnow().isoformat() temp_path = f"{state_path}.tmp" with open(temp_path, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(temp_path, state_path) logging.info(f"๐Ÿ’พ State saved to {state_path}") except Exception as e: logging.error(f"โŒ Failed to save state file {state_path}: {e}") def remember_posted_tweet(state, candidate, bsky_uri=None): """ Store successful post in local state. Primary key is canonical tweet URL when available. Fallback key uses text_media_key. """ canonical_tweet_url = candidate.get("canonical_tweet_url") fallback_key = f"textmedia:{candidate['text_media_key']}" state_key = canonical_tweet_url or fallback_key record = { "canonical_tweet_url": canonical_tweet_url, "normalized_text": candidate["normalized_text"], "raw_text": candidate["raw_text"], "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), "bsky_uri": bsky_uri, "tweet_created_on": candidate["tweet"].created_on, "tweet_url": candidate["tweet"].tweet_url, "posted_at": arrow.utcnow().isoformat(), } state["posted_tweets"][state_key] = record if bsky_uri: state["posted_by_bsky_uri"][bsky_uri] = state_key def candidate_matches_state(candidate, state): """ Strong private dedupe using local persistent state. Match order: 1. canonical tweet URL 2. text + media fingerprint 3. normalized text """ canonical_tweet_url = candidate["canonical_tweet_url"] text_media_key = candidate["text_media_key"] normalized_text = candidate["normalized_text"] posted_tweets = state.get("posted_tweets", {}) if canonical_tweet_url and canonical_tweet_url in posted_tweets: return True, "state:tweet_url" for _, record in posted_tweets.items(): if record.get("text_media_key") == text_media_key: return True, "state:text_media_fingerprint" for _, record in posted_tweets.items(): if record.get("normalized_text") == normalized_text: return True, "state:normalized_text" return False, None def prune_state(state, max_entries=5000): """ Keep state file from growing forever. Prunes oldest records by posted_at if necessary. """ posted_tweets = state.get("posted_tweets", {}) if len(posted_tweets) <= max_entries: return state sortable = [] for key, record in posted_tweets.items(): posted_at = record.get("posted_at") or "" sortable.append((key, posted_at)) sortable.sort(key=lambda x: x[1], reverse=True) keep_keys = {key for key, _ in sortable[:max_entries]} new_posted_tweets = {} for key, record in posted_tweets.items(): if key in keep_keys: new_posted_tweets[key] = record new_posted_by_bsky_uri = {} for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items(): if key in keep_keys: new_posted_by_bsky_uri[bsky_uri] = key state["posted_tweets"] = new_posted_tweets state["posted_by_bsky_uri"] = new_posted_by_bsky_uri return state # --- Bluesky Post History --- def get_recent_bsky_posts(client, handle, limit=30): """ Fetch recent top-level Bluesky posts for duplicate detection. Returns a list of dicts with dedupe keys. """ recent_posts = [] try: timeline = client.get_author_feed(handle, limit=limit) for item in timeline.feed: try: if item.reason is not None: continue record = item.post.record if getattr(record, "reply", None) is not None: continue text = getattr(record, "text", "") or "" normalized_text = normalize_post_text(text) urls = [] urls.extend(extract_non_x_urls_from_text(text)) urls.extend(extract_urls_from_facets(record)) canonical_non_x_urls = set() for url in urls: if not is_x_or_twitter_domain(url): canonical = canonicalize_url(url) if canonical: canonical_non_x_urls.add(canonical) media_fingerprint = build_bsky_media_fingerprint(item.post) text_media_key = build_text_media_key(normalized_text, media_fingerprint) recent_posts.append({ "uri": getattr(item.post, "uri", None), "text": text, "normalized_text": normalized_text, "canonical_non_x_urls": canonical_non_x_urls, "media_fingerprint": media_fingerprint, "text_media_key": text_media_key, "created_at": getattr(record, "created_at", None), }) except Exception as e: logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") except Exception as e: logging.warning(f"โš ๏ธ Could not fetch recent Bluesky posts for duplicate detection: {e}") return recent_posts def make_rich(content): text_builder = client_utils.TextBuilder() def repair_url(match): raw = match.group(0) if "\n" not in raw and "\r" not in raw: return re.sub(r"[โ€ฆ\.]+$", "", raw) glued = raw.replace("\n", "").replace("\r", "") test_url = re.sub(r"[โ€ฆ\.]+$", "", glued) if is_valid_url(test_url): return test_url parts = raw.split("\n") test_part0 = re.sub(r"[โ€ฆ\.]+$", "", parts[0]) if is_valid_url(test_part0): return raw return test_url content = re.sub(r"https?://[^\ \t]+", repair_url, content.strip()) lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): if word.startswith("http://"): word = word.replace("http://", "https://", 1) word = re.sub(r"[โ€ฆ\.]+$", "", word) clean_url_value = clean_url(word) if clean_url_value and is_valid_url(clean_url_value): text_builder.link(clean_url_value, clean_url_value) else: text_builder.text(word) elif word.startswith("#"): clean_tag = word[1:].rstrip(".,;:!?)'\"โ€ฆ") text_builder.tag(word, clean_tag) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder def build_dynamic_alt(raw_text): dynamic_alt = raw_text.replace("\n", " ").strip() dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() if len(dynamic_alt) > 150: dynamic_alt = dynamic_alt[:147] + "..." elif not dynamic_alt: dynamic_alt = "Attached video or image from tweet" return dynamic_alt def build_video_embed(video_blob, alt_text): try: return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) except AttributeError: logging.error("โŒ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") return None # --- Playwright Scraping --- def scrape_tweets_via_playwright(username, password, email, target_handle): tweets = [] state_file = "twitter_browser_state.json" with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"] ) clean_ua = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ) context = None needs_login = True if os.path.exists(state_file): logging.info("โœ… Found existing browser state. Attempting to bypass login...") context = browser.new_context( user_agent=clean_ua, viewport={"width": 1920, "height": 1080}, storage_state=state_file ) page = context.new_page() page.goto("https://x.com/home") time.sleep(4) if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: logging.info("โœ… Session is valid!") needs_login = False else: logging.warning("โš ๏ธ Saved session expired or invalid. Re-logging in...") context.close() os.remove(state_file) if needs_login: logging.info("๐Ÿš€ Launching fresh browser for automated Twitter login...") context = browser.new_context( user_agent=clean_ua, viewport={"width": 1920, "height": 1080} ) page = context.new_page() try: page.goto("https://x.com") sign_in_button = page.get_by_text("Sign in", exact=True) sign_in_button.wait_for(state="visible", timeout=15000) sign_in_button.click(force=True) page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000) logging.info(f"๐Ÿ‘ค Entering username: {username}...") time.sleep(1) username_input = page.locator('input[autocomplete="username"]') username_input.wait_for(state="visible", timeout=15000) username_input.click(force=True) username_input.press_sequentially(username, delay=100) page.locator('button:has-text("Next")').first.click(force=True) page.wait_for_selector( 'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', timeout=15000 ) time.sleep(1) if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): logging.warning("๐Ÿ›ก๏ธ Security challenge detected! Entering email/phone...") page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first if sec_next.is_visible(): sec_next.click(force=True) else: page.keyboard.press("Enter") page.wait_for_selector('input[name="password"]', timeout=15000) time.sleep(1) logging.info("๐Ÿ”‘ Entering password...") page.fill('input[name="password"]', password) page.locator('span:has-text("Log in")').first.click() page.wait_for_url("**/home", timeout=20000) time.sleep(3) context.storage_state(path=state_file) logging.info("โœ… Login successful. Browser state saved.") except Exception as e: take_error_screenshot(page, "login_failed") logging.error(f"โŒ Login failed: {e}") browser.close() return [] logging.info(f"๐ŸŒ Navigating to https://x.com/{target_handle} to scrape tweets...") page = context.new_page() page.goto(f"https://x.com/{target_handle}") try: page.wait_for_selector("article", timeout=20000) time.sleep(3) articles = page.locator("article").all() logging.info(f"๐Ÿ“Š Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...") for article in articles[:SCRAPE_TWEET_LIMIT]: try: time_el = article.locator("time").first if not time_el.is_visible(): continue created_at = time_el.get_attribute("datetime") tweet_url = None time_link = article.locator("a:has(time)").first if time_link.is_visible(): href = time_link.get_attribute("href") if href: tweet_url = f"https://x.com{href}" if href.startswith("/") else href text_locator = article.locator('[data-testid="tweetText"]').first text = text_locator.inner_text() if text_locator.is_visible() else "" media_urls = [] photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() for img in photo_locators: src = img.get_attribute("src") if src: src = re.sub(r"&name=\w+", "&name=large", src) media_urls.append((src, "photo")) video_locators = article.locator('[data-testid="videoPlayer"]').all() if video_locators: media_urls.append((tweet_url or "", "video")) tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url)) except Exception as e: logging.warning(f"โš ๏ธ Failed to parse a specific tweet: {e}") continue except Exception as e: take_error_screenshot(page, "scrape_failed") logging.error(f"โŒ Failed to scrape profile: {e}") browser.close() return tweets def extract_video_url_from_tweet_page(context, tweet_url): page = context.new_page() best_m3u8_url = None best_video_mp4_url = None seen_urls = set() def is_audio_only_mp4(url, content_type): url_l = url.lower() content_type_l = content_type.lower() return ( "/aud/" in url_l or "/audio/" in url_l or "mp4a" in url_l or ("audio/" in content_type_l and "video/" not in content_type_l) ) def handle_response(response): nonlocal best_m3u8_url, best_video_mp4_url try: url = response.url if url in seen_urls: return seen_urls.add(url) url_l = url.lower() content_type = response.headers.get("content-type", "") content_type_l = content_type.lower() if ".m4s" in url_l: return if ( ".m3u8" in url_l or "application/vnd.apple.mpegurl" in content_type_l or "application/x-mpegurl" in content_type_l ): if best_m3u8_url is None: best_m3u8_url = url logging.info(f"๐Ÿ“บ Found HLS playlist URL: {url}") return if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l: if is_audio_only_mp4(url, content_type): logging.info(f"๐Ÿ”‡ Ignoring audio-only MP4: {url}") return if best_video_mp4_url is None: best_video_mp4_url = url logging.info(f"๐ŸŽฅ Found VIDEO MP4 URL: {url}") return except Exception as e: logging.debug(f"Response parsing error: {e}") page.on("response", handle_response) def current_best(): return best_m3u8_url or best_video_mp4_url try: logging.info(f"๐ŸŽฌ Opening tweet page to capture video URL: {tweet_url}") page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) time.sleep(3) player = page.locator('[data-testid="videoPlayer"]').first if player.count() > 0: try: player.scroll_into_view_if_needed(timeout=5000) except Exception: pass try: player.click(force=True, timeout=5000) logging.info("โ–ถ๏ธ Clicked video player") except Exception as e: logging.info(f"โš ๏ธ First player click failed: {e}") else: logging.warning("โš ๏ธ No video player locator found on tweet page") for _ in range(12): if current_best(): break time.sleep(1) if not current_best() and player.count() > 0: logging.info("๐Ÿ” No media URL found yet, retrying player interaction...") try: player.click(force=True, timeout=5000) time.sleep(2) except Exception as e: logging.info(f"โš ๏ธ Retry click failed: {e}") try: page.keyboard.press("Space") time.sleep(1) except Exception: pass for _ in range(8): if current_best(): break time.sleep(1) selected_url = current_best() if selected_url: logging.info(f"โœ… Selected media URL for download: {selected_url}") else: logging.warning(f"โš ๏ธ No playable media URL detected on tweet page: {tweet_url}") return selected_url except Exception as e: logging.warning(f"โš ๏ธ Could not extract video URL from tweet page {tweet_url}: {e}") return None finally: page.close() # --- Video Processing --- def download_and_crop_video(video_url, output_path): """ Download, trim, and compress video before upload. """ temp_input = output_path.replace(".mp4", "_source.mp4") temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") temp_output = output_path.replace(".mp4", "_compressed.mp4") try: logging.info(f"โฌ‡๏ธ Downloading video source with ffmpeg: {video_url}") video_url_l = video_url.lower() if ".m3u8" in video_url_l: logging.info("๐Ÿ“บ Using HLS ffmpeg mode") download_cmd = [ "ffmpeg", "-y", "-protocol_whitelist", "file,http,https,tcp,tls,crypto", "-allowed_extensions", "ALL", "-i", video_url, "-c", "copy", temp_input, ] else: logging.info("๐ŸŽฅ Using direct MP4 ffmpeg mode") download_cmd = [ "ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input, ] download_result = subprocess.run(download_cmd, capture_output=True, text=True) if download_result.returncode != 0: logging.error(f"โŒ ffmpeg download failed:\n{download_result.stderr}") return None if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: logging.error("โŒ Downloaded video source file is missing or empty.") return None logging.info(f"โœ… Video downloaded: {temp_input}") video_clip = VideoFileClip(temp_input) duration = float(video_clip.duration) if video_clip.duration else 0 if duration <= 0: video_clip.close() logging.error("โŒ Downloaded video has invalid or unknown duration.") return None end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) if hasattr(video_clip, "subclipped"): cropped_clip = video_clip.subclipped(0, end_time) else: cropped_clip = video_clip.subclip(0, end_time) cropped_clip.write_videofile( temp_trimmed, codec="libx264", audio_codec="aac", preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None ) video_clip.close() cropped_clip.close() if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0: logging.error("โŒ Trimmed video output is missing or empty.") return None trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024) logging.info(f"๐Ÿ“ฆ Trimmed video size before compression: {trimmed_size_mb:.2f} MB") compress_cmd = [ "ffmpeg", "-y", "-i", temp_trimmed, "-vf", "scale='min(720,iw)':-2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "30", "-maxrate", "1800k", "-bufsize", "3600k", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", temp_output, ] compress_result = subprocess.run(compress_cmd, capture_output=True, text=True) if compress_result.returncode != 0: logging.error(f"โŒ ffmpeg compression failed:\n{compress_result.stderr}") return None if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: logging.error("โŒ Compressed video output is missing or empty.") return None final_size_mb = os.path.getsize(temp_output) / (1024 * 1024) logging.info(f"โœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)") os.replace(temp_output, output_path) logging.info(f"โœ… Final video ready: {output_path}") return output_path except Exception as e: logging.error(f"โŒ Error processing video: {repr(e)}") return None finally: for path in [temp_input, temp_trimmed, temp_output]: if os.path.exists(path): try: os.remove(path) except Exception: pass def candidate_matches_existing_bsky(candidate, recent_bsky_posts): """ Multi-signal dedupe against recent Bluesky posts. """ candidate_non_x_urls = candidate["canonical_non_x_urls"] candidate_text_media_key = candidate["text_media_key"] candidate_normalized_text = candidate["normalized_text"] for existing in recent_bsky_posts: existing_non_x_urls = existing["canonical_non_x_urls"] if ( candidate_non_x_urls and candidate_non_x_urls == existing_non_x_urls and candidate_normalized_text == existing["normalized_text"] ): return True, "bsky:normalized_text_plus_non_x_urls" if candidate_text_media_key == existing["text_media_key"]: return True, "bsky:text_media_fingerprint" if candidate_normalized_text == existing["normalized_text"]: return True, "bsky:normalized_text" return False, None # --- Main Sync Function --- def sync_feeds(args): logging.info("๐Ÿ”„ Starting sync cycle...") try: state = load_state(STATE_PATH) tweets = scrape_tweets_via_playwright( args.twitter_username, args.twitter_password, args.twitter_email, args.twitter_handle ) if not tweets: logging.warning("โš ๏ธ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") return bsky_client = create_bsky_client( args.bsky_base_url, args.bsky_handle, args.bsky_password ) recent_bsky_posts = get_recent_bsky_posts( bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT ) logging.info(f"๐Ÿง  Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") logging.info(f"๐Ÿง  Local state currently tracks {len(state.get('posted_tweets', {}))} posted items.") too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) logging.info(f"๐Ÿ•’ Will ignore tweets older than: {too_old_cutoff}") candidate_tweets = [] for tweet in reversed(tweets): try: tweet_time = arrow.get(tweet.created_on) if tweet_time < too_old_cutoff: logging.info(f"โญ๏ธ Skipping old tweet from {tweet_time}") continue prepared_text = prepare_post_text(tweet.text) normalized_text = normalize_post_text(prepared_text) if not normalized_text: logging.info(f"โญ๏ธ Skipping empty/blank tweet from {tweet_time}") continue media_fingerprint = build_media_fingerprint(tweet) text_media_key = build_text_media_key(normalized_text, media_fingerprint) canonical_non_x_urls = set() for url in extract_non_x_urls_from_text(prepared_text): canonical = canonicalize_url(url) if canonical: canonical_non_x_urls.add(canonical) candidate_tweets.append({ "tweet": tweet, "tweet_time": tweet_time, "raw_text": prepared_text, "normalized_text": normalized_text, "media_fingerprint": media_fingerprint, "text_media_key": text_media_key, "canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url), "canonical_non_x_urls": canonical_non_x_urls, }) except Exception as e: logging.warning(f"โš ๏ธ Failed to prepare candidate tweet: {e}") logging.info(f"๐Ÿงช Prepared {len(candidate_tweets)} candidate tweets for duplicate comparison.") tweets_to_post = [] for candidate in candidate_tweets: is_dup_state, reason_state = candidate_matches_state(candidate, state) if is_dup_state: logging.info(f"โญ๏ธ Skipping candidate due to local state duplicate match on: {reason_state}") continue is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) if is_dup_bsky: logging.info(f"โญ๏ธ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") continue tweets_to_post.append(candidate) logging.info(f"๐Ÿ“ฌ {len(tweets_to_post)} tweets remain after duplicate filtering.") if not tweets_to_post: logging.info("โœ… No new tweets need posting after duplicate comparison.") return new_posts = 0 browser_state_file = "twitter_browser_state.json" with sync_playwright() as p, httpx.Client() as media_http_client: browser = p.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"] ) context_kwargs = { "user_agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ), "viewport": {"width": 1920, "height": 1080}, } if os.path.exists(browser_state_file): context_kwargs["storage_state"] = browser_state_file context = browser.new_context(**context_kwargs) for candidate in tweets_to_post: tweet = candidate["tweet"] tweet_time = candidate["tweet_time"] raw_text = candidate["raw_text"] logging.info(f"๐Ÿ“ Posting missing tweet from {tweet_time} to Bluesky...") rich_text = make_rich(raw_text) dynamic_alt = build_dynamic_alt(raw_text) image_embeds = [] video_embed = None external_embed = None media_upload_failures = [] if tweet.media: for media in tweet.media: if media.type == "photo": blob = get_blob_from_url(media.media_url_https, bsky_client, media_http_client) if blob: image_embeds.append( models.AppBskyEmbedImages.Image( alt=dynamic_alt, image=blob ) ) else: media_upload_failures.append(f"photo:{media.media_url_https}") elif media.type == "video": if not tweet.tweet_url: logging.warning("โš ๏ธ Tweet has video marker but no tweet URL. Skipping video.") media_upload_failures.append("video:no_tweet_url") continue temp_video_path = "temp_video.mp4" try: real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) if not real_video_url: logging.warning(f"โš ๏ธ Could not resolve playable video URL for {tweet.tweet_url}") media_upload_failures.append(f"video:resolve_failed:{tweet.tweet_url}") continue cropped_video_path = download_and_crop_video(real_video_url, temp_video_path) if not cropped_video_path: logging.warning(f"โš ๏ธ Video download/crop failed for {tweet.tweet_url}") media_upload_failures.append(f"video:crop_failed:{tweet.tweet_url}") continue video_blob = get_blob_from_file(cropped_video_path, bsky_client) if not video_blob: logging.warning(f"โš ๏ธ Video upload blob failed for {tweet.tweet_url}") media_upload_failures.append(f"video:upload_failed:{tweet.tweet_url}") continue video_embed = build_video_embed(video_blob, dynamic_alt) if not video_embed: media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}") finally: if os.path.exists(temp_video_path): os.remove(temp_video_path) # Only create an external link card if no image/video embed will be used. if not video_embed and not image_embeds and candidate["canonical_non_x_urls"]: first_non_x_url = sorted(candidate["canonical_non_x_urls"])[0] external_embed = build_external_link_embed( first_non_x_url, bsky_client, media_http_client, fallback_title="Link" ) if external_embed: logging.info(f"๐Ÿ”— Built external link card for URL: {first_non_x_url}") else: logging.info(f"โ„น๏ธ No external link card metadata available for URL: {first_non_x_url}") try: post_result = None post_mode = "text" if video_embed: post_result = bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"]) post_mode = "video" elif image_embeds: embed = models.AppBskyEmbedImages.Main(images=image_embeds) post_result = bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"]) post_mode = f"images:{len(image_embeds)}" elif external_embed: post_result = bsky_client.send_post(text=rich_text, embed=external_embed, langs=["ca"]) post_mode = "external_link_card" else: post_result = bsky_client.send_post(text=rich_text, langs=["ca"]) post_mode = "text_only" bsky_uri = getattr(post_result, "uri", None) remember_posted_tweet(state, candidate, bsky_uri=bsky_uri) state = prune_state(state, max_entries=5000) save_state(state, STATE_PATH) recent_bsky_posts.insert(0, { "uri": bsky_uri, "text": raw_text, "normalized_text": candidate["normalized_text"], "canonical_non_x_urls": candidate["canonical_non_x_urls"], "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], "created_at": arrow.utcnow().isoformat(), }) recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] new_posts += 1 if media_upload_failures: logging.warning( f"โœ… Posted tweet to Bluesky with degraded media mode ({post_mode}). " f"Failed media items: {media_upload_failures}" ) else: logging.info(f"โœ… Posted new tweet to Bluesky with mode {post_mode}: {raw_text}") time.sleep(5) except Exception as e: logging.error(f"โŒ Failed to post tweet to Bluesky: {e}") browser.close() logging.info(f"โœ… Sync complete. Posted {new_posts} new updates.") except Exception as e: logging.error(f"โŒ Error during sync cycle: {e}") # --- Main Execution --- def main(): load_dotenv() parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") parser.add_argument("--twitter-username", help="Your Twitter login username") parser.add_argument("--twitter-password", help="Your Twitter login password") parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") parser.add_argument("--twitter-handle", help="The Twitter account to scrape") parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") parser.add_argument("--bsky-base-url", help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social") args = parser.parse_args() args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username args.bsky_base_url = args.bsky_base_url if args.bsky_base_url else DEFAULT_BSKY_BASE_URL missing_args = [] if not args.twitter_username: missing_args.append("--twitter-username") if not args.twitter_password: missing_args.append("--twitter-password") if not args.bsky_handle: missing_args.append("--bsky-handle") if not args.bsky_password: missing_args.append("--bsky-password") if missing_args: logging.error(f"โŒ Missing credentials! You forgot to provide: {', '.join(missing_args)}") return logging.info(f"๐Ÿค– Bot started. Will check @{args.twitter_handle}") logging.info(f"๐ŸŒ Posting destination base URL: {args.bsky_base_url}") sync_feeds(args) logging.info("๐Ÿค– Bot finished.") if __name__ == "__main__": main()