import argparse import arrow import hashlib import io import json import logging import re import httpx import time import os import subprocess from urllib.parse import urlparse from dotenv import load_dotenv from atproto import Client, client_utils, models from playwright.sync_api import sync_playwright from moviepy import VideoFileClip from bs4 import BeautifulSoup from PIL import Image # --- Configuration --- LOG_PATH = "twitter2bsky.log" STATE_PATH = "twitter2bsky_state.json" SCRAPE_TWEET_LIMIT = 30 DEDUPE_BSKY_LIMIT = 30 TWEET_MAX_AGE_DAYS = 3 BSKY_TEXT_MAX_LENGTH = 275 VIDEO_MAX_DURATION_SECONDS = 179 MAX_VIDEO_UPLOAD_SIZE_MB = 45 # External-card thumbnail constraints: # Conservative safe target below the observed PDS max (~976.56 KB). EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 EXTERNAL_THUMB_MAX_DIMENSION = 1200 EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 BSKY_BLOB_UPLOAD_BASE_DELAY = 10 BSKY_BLOB_UPLOAD_MAX_DELAY = 300 MEDIA_DOWNLOAD_TIMEOUT = 30 LINK_METADATA_TIMEOUT = 10 DEFAULT_BSKY_BASE_URL = "https://bsky.social" # --- Logging Setup --- logging.basicConfig( format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()], level=logging.INFO, ) # --- Custom Classes --- class ScrapedMedia: def __init__(self, url, media_type="photo"): self.type = media_type self.media_url_https = url class ScrapedTweet: def __init__(self, created_on, text, media_urls, tweet_url=None): self.created_on = created_on self.text = text self.tweet_url = tweet_url self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] # --- Helpers --- def take_error_screenshot(page, error_msg): logging.info(f"๐Ÿ“ธ Taking screenshot... Shot: {error_msg}") timestamp = time.strftime("%Y%m%d_%H%M%S") screenshot_name = f"screenshot_{timestamp}.png" page.screenshot(path=screenshot_name) logging.info(f"๐Ÿ“ธ Screenshot saved as: {screenshot_name}") def is_valid_url(url): try: response = httpx.head(url, timeout=5, follow_redirects=True) return response.status_code < 500 except Exception: return False def strip_trailing_url_punctuation(url): if not url: return url return re.sub(r"[\sโ€ฆ\.,;:!?)\]\"']+$", "", url.strip()) def repair_broken_urls(text): """ Repair URLs that were split by copied/scraped line breaks. Examples: https:// 3cat.cat/path becomes: https://3cat.cat/path https://3cat.cat/some-pa th/article becomes: https://3cat.cat/some-path/article """ if not text: return text original = text text = re.sub(r"(https?://)\s*[\r\n]+\s*", r"\1", text, flags=re.IGNORECASE) prev_text = None while prev_text != text: prev_text = text text = re.sub( r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", r"\1\2", text, flags=re.IGNORECASE ) text = re.sub( r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", r"\1\2", text, flags=re.IGNORECASE ) if text != original: logging.info("๐Ÿ”ง Repaired broken URL wrapping in scraped text") return text def repair_broken_mentions(text): """ Repair mention-related line wrapping in scraped text. Handles cases like: Ho explica @martamartorell La @sanfenerea tenia un repte Hospital @parctauli . conjunt @bomberscat -SEM. becoming: Ho explica @martamartorell La @sanfenerea tenia un repte Hospital @parctauli . conjunt @bomberscat -SEM. while preserving real paragraph breaks and standalone mention lines. """ if not text: return text lines = text.splitlines() result = [] i = 0 changed = False def is_mention_only_line(s): return bool(re.fullmatch(r"@[A-Za-z0-9_]+", s.strip())) def is_blank_line(s): return not s.strip() while i < len(lines): current = lines[i] stripped = current.strip() if is_blank_line(current): result.append("") i += 1 continue # If current line is only a mention, try to attach it backward. if is_mention_only_line(current): if result and result[-1].strip(): result[-1] = result[-1].rstrip() + " " + stripped changed = True else: result.append(stripped) i += 1 # Attach immediately following continuation lines if they are not blank # and not another standalone mention. while i < len(lines): next_line = lines[i] next_stripped = next_line.strip() if is_blank_line(next_line): break if is_mention_only_line(next_line): break result[-1] = result[-1].rstrip() + " " + next_stripped changed = True i += 1 if i < len(lines) and is_blank_line(lines[i]): break continue # If current line has text and next line is a mention, merge them. if i + 1 < len(lines) and is_mention_only_line(lines[i + 1]): merged = stripped + " " + lines[i + 1].strip() changed = True i += 2 while i < len(lines): next_line = lines[i] next_stripped = next_line.strip() if is_blank_line(next_line): break if is_mention_only_line(next_line): break merged = merged.rstrip() + " " + next_stripped changed = True i += 1 if i < len(lines) and is_blank_line(lines[i]): break result.append(merged) continue result.append(stripped) i += 1 new_text = "\n".join(result) if changed: logging.info("๐Ÿ”ง Repaired broken mention wrapping in scraped text") return new_text def strip_line_edge_whitespace(text): """ Remove leading/trailing whitespace from each line while preserving the line structure and intentional blank lines. """ if not text: return text lines = text.splitlines() cleaned_lines = [] changed = False for line in lines: cleaned = line.strip() if cleaned != line: changed = True cleaned_lines.append(cleaned) new_text = "\n".join(cleaned_lines) if changed: logging.info("๐Ÿ”ง Stripped leading/trailing whitespace from scraped text lines") return new_text def remove_trailing_ellipsis_line(text): """ Remove trailing lines that are only ellipsis markers. Handles: - ... - โ€ฆ """ if not text: return text lines = text.splitlines() while lines and lines[-1].strip() in {"...", "โ€ฆ"}: lines.pop() return "\n".join(lines).strip() def clean_url(url): trimmed_url = url.strip() cleaned_url = re.sub(r"\s+", "", trimmed_url) cleaned_url = strip_trailing_url_punctuation(cleaned_url) if is_valid_url(cleaned_url): return cleaned_url return None def canonicalize_url(url): if not url: return None return strip_trailing_url_punctuation(url.strip()) def canonicalize_tweet_url(url): if not url: return None url = url.strip() match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE) if not match: return url.lower() handle = match.group(1).lower() tweet_id = match.group(2) return f"https://x.com/{handle}/status/{tweet_id}" def is_x_or_twitter_domain(url): try: hostname = (urlparse(url).hostname or "").lower() return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} except Exception: return False def extract_urls_from_text(text): if not text: return [] repaired = repair_broken_urls(text) return re.findall(r"https?://[^\s]+", repaired) def extract_non_x_urls_from_text(text): urls = extract_urls_from_text(text) result = [] for url in urls: cleaned = strip_trailing_url_punctuation(url) if cleaned and not is_x_or_twitter_domain(cleaned): result.append(cleaned) return result def extract_ordered_non_x_urls(text): seen = set() ordered = [] for url in extract_non_x_urls_from_text(text): canonical = canonicalize_url(url) if canonical and canonical not in seen: seen.add(canonical) ordered.append(canonical) return ordered def extract_urls_from_facets(record): urls = [] try: facets = getattr(record, "facets", None) or [] for facet in facets: features = getattr(facet, "features", None) or [] for feature in features: uri = getattr(feature, "uri", None) if uri: urls.append(uri) except Exception as e: logging.debug(f"Could not extract facet URLs: {e}") return urls def looks_like_title_plus_url_post(text): if not text: return False repaired = repair_broken_urls(text) repaired = strip_line_edge_whitespace(repaired) lines = [line.strip() for line in repaired.splitlines() if line.strip()] if len(lines) < 2: return False last_line = lines[-1] urls_in_last_line = extract_ordered_non_x_urls(last_line) total_urls = extract_ordered_non_x_urls(repaired) return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://")) def get_rate_limit_wait_seconds(error_obj, default_delay): try: headers = getattr(error_obj, "headers", None) if headers: reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: now_ts = int(time.time()) reset_ts = int(reset_value) wait_seconds = max(reset_ts - now_ts + 1, default_delay) return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) except Exception: pass return default_delay def upload_blob_with_retry(client, binary_data, media_label="media"): last_exception = None for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): try: result = client.upload_blob(binary_data) return result.blob except Exception as e: last_exception = e error_text = str(e) is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text if not is_rate_limited: logging.warning(f"Could not upload {media_label}: {repr(e)}") if hasattr(e, "response") and e.response is not None: try: logging.warning(f"Upload response status: {e.response.status_code}") logging.warning(f"Upload response body: {e.response.text}") except Exception: pass return None backoff_delay = min( BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY ) wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: logging.warning( f"โณ Bluesky blob upload rate-limited for {media_label}. " f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." ) time.sleep(wait_seconds) else: logging.warning( f"โŒ Exhausted blob upload retries for {media_label} after rate limiting: {repr(e)}" ) logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") return None def get_blob_from_url(media_url, client, http_client): try: r = http_client.get(media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True) if r.status_code != 200: logging.warning(f"Could not fetch media {media_url}: HTTP {r.status_code}") return None content = r.content if not content: logging.warning(f"Could not fetch media {media_url}: empty response body") return None return upload_blob_with_retry(client, content, media_label=media_url) except Exception as e: logging.warning(f"Could not fetch media {media_url}: {repr(e)}") return None def get_blob_from_file(file_path, client): try: if not os.path.exists(file_path): logging.warning(f"Could not upload local file {file_path}: file does not exist") return None file_size = os.path.getsize(file_path) file_size_mb = file_size / (1024 * 1024) logging.info(f"๐Ÿ“ฆ Uploading local file {file_path} ({file_size_mb:.2f} MB)") if file_path.lower().endswith(".mp4") and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB: logging.warning( f"Could not upload local file {file_path}: " f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" ) return None with open(file_path, "rb") as f: binary_data = f.read() return upload_blob_with_retry(client, binary_data, media_label=file_path) except Exception as e: logging.warning(f"Could not upload local file {file_path}: {repr(e)}") if hasattr(e, "response") and e.response is not None: try: logging.warning(f"Upload response status: {e.response.status_code}") logging.warning(f"Upload response body: {e.response.text}") except Exception: pass return None def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES): """ Compress/resize an image to fit external thumbnail blob size limits. Returns JPEG bytes or None. """ try: with Image.open(io.BytesIO(image_bytes)) as img: img = img.convert("RGB") width, height = img.size max_dim = max(width, height) if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim new_size = (max(1, int(width * scale)), max(1, int(height * scale))) img = img.resize(new_size, Image.LANCZOS) logging.info(f"๐Ÿ–ผ๏ธ Resized external thumb to {new_size[0]}x{new_size[1]}") for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: out = io.BytesIO() img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) data = out.getvalue() logging.info( f"๐Ÿ–ผ๏ธ External thumb candidate size at JPEG quality {quality}: " f"{len(data) / 1024:.2f} KB" ) if len(data) <= max_bytes: return data for target_dim in [1000, 900, 800, 700, 600]: resized = img.copy() width, height = resized.size max_dim = max(width, height) if max_dim > target_dim: scale = target_dim / max_dim new_size = (max(1, int(width * scale)), max(1, int(height * scale))) resized = resized.resize(new_size, Image.LANCZOS) for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: out = io.BytesIO() resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) data = out.getvalue() logging.info( f"๐Ÿ–ผ๏ธ External thumb resized to <= {target_dim}px at quality {quality}: " f"{len(data) / 1024:.2f} KB" ) if len(data) <= max_bytes: return data except Exception as e: logging.warning(f"Could not compress external thumbnail: {repr(e)}") return None def get_external_thumb_blob_from_url(image_url, client, http_client): """ Download, size-check, compress if needed, and upload an external-card thumbnail blob. If the image cannot fit within the PDS blob limit, return None so the external card can still be posted without a thumbnail. """ try: r = http_client.get(image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True) if r.status_code != 200: logging.warning(f"Could not fetch external thumb {image_url}: HTTP {r.status_code}") return None content = r.content if not content: logging.warning(f"Could not fetch external thumb {image_url}: empty body") return None original_size_kb = len(content) / 1024 logging.info(f"๐Ÿ–ผ๏ธ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)") upload_bytes = content if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES: logging.info( f"๐Ÿ–ผ๏ธ External thumb exceeds safe limit " f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..." ) compressed = compress_external_thumb_to_limit(upload_bytes, EXTERNAL_THUMB_MAX_BYTES) if compressed: upload_bytes = compressed logging.info(f"โœ… External thumb compressed to {len(upload_bytes) / 1024:.2f} KB") else: logging.warning("โš ๏ธ Could not compress external thumb to fit limit. Will omit thumbnail.") return None else: logging.info("โœ… External thumb already within safe size limit.") blob = upload_blob_with_retry(client, upload_bytes, media_label=f"external-thumb:{image_url}") if blob: return blob logging.warning("โš ๏ธ External thumb upload failed. Will omit thumbnail.") return None except Exception as e: logging.warning(f"Could not fetch/upload external thumb {image_url}: {repr(e)}") return None def fetch_link_metadata(url, http_client): try: r = http_client.get(url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") title = (soup.find("meta", property="og:title") or soup.find("title")) desc = ( soup.find("meta", property="og:description") or soup.find("meta", attrs={"name": "description"}) ) image = ( soup.find("meta", property="og:image") or soup.find("meta", attrs={"name": "twitter:image"}) ) return { "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), "description": desc["content"] if desc and desc.has_attr("content") else "", "image": image["content"] if image and image.has_attr("content") else None, } except Exception as e: logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}") return {} def build_external_link_embed(url, client, http_client, fallback_title="Link"): """ Build a Bluesky external embed from a URL. If the thumbnail image is too large, omit the thumbnail but still return the link card. """ link_metadata = fetch_link_metadata(url, http_client) thumb_blob = None if link_metadata.get("image"): thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client) if thumb_blob: logging.info("โœ… External link card thumbnail prepared successfully") else: logging.info("โ„น๏ธ External link card will be posted without thumbnail") if link_metadata.get("title") or link_metadata.get("description") or thumb_blob: return models.AppBskyEmbedExternal.Main( external=models.AppBskyEmbedExternal.External( uri=url, title=link_metadata.get("title") or fallback_title, description=link_metadata.get("description") or "", thumb=thumb_blob, ) ) return None def prepare_post_text(text): raw_text = (text or "").strip() raw_text = repair_broken_urls(raw_text) raw_text = repair_broken_mentions(raw_text) raw_text = strip_line_edge_whitespace(raw_text) raw_text = remove_trailing_ellipsis_line(raw_text) if len(raw_text) > BSKY_TEXT_MAX_LENGTH: truncated = raw_text[:BSKY_TEXT_MAX_LENGTH - 3] last_space = truncated.rfind(" ") if last_space > 0: raw_text = truncated[:last_space] + "..." else: raw_text = truncated + "..." return raw_text.strip() def normalize_post_text(text): if not text: return "" text = repair_broken_urls(text) text = repair_broken_mentions(text) text = strip_line_edge_whitespace(text) text = remove_trailing_ellipsis_line(text) text = text.replace("\r", "\n") text = re.sub(r"\s+", " ", text).strip() return text.lower() def build_media_fingerprint(tweet): if not tweet or not tweet.media: return "no-media" parts = [] for media in tweet.media: media_type = getattr(media, "type", "unknown") media_url = getattr(media, "media_url_https", "") or "" stable_value = media_url if media_type == "photo": stable_value = re.sub(r"[?&]name=\w+", "", stable_value) stable_value = re.sub(r"[?&]format=\w+", "", stable_value) elif media_type == "video": stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "") parts.append(f"{media_type}:{stable_value}") parts.sort() raw = "|".join(parts) return hashlib.sha256(raw.encode("utf-8")).hexdigest() def build_bsky_media_fingerprint(post_view): try: embed = getattr(post_view, "embed", None) if not embed: return "no-media" parts = [] images = getattr(embed, "images", None) if images: for img in images: image_obj = getattr(img, "image", None) ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj) parts.append(f"photo:{ref}") video = getattr(embed, "video", None) if video: ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video) parts.append(f"video:{ref}") external = getattr(embed, "external", None) if external: uri = getattr(external, "uri", None) or str(external) parts.append(f"external:{uri}") if not parts: return "no-media" parts.sort() raw = "|".join(parts) return hashlib.sha256(raw.encode("utf-8")).hexdigest() except Exception as e: logging.debug(f"Could not build Bluesky media fingerprint: {e}") return "no-media" def build_text_media_key(normalized_text, media_fingerprint): return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest() def create_bsky_client(base_url, handle, password): normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") logging.info(f"๐Ÿ” Connecting Bluesky client via base URL: {normalized_base_url}") try: client = Client(base_url=normalized_base_url) except TypeError: logging.warning("โš ๏ธ Your atproto Client does not accept base_url in constructor. Falling back.") client = Client() try: if hasattr(client, "base_url"): client.base_url = normalized_base_url elif hasattr(client, "_base_url"): client._base_url = normalized_base_url except Exception as e: logging.warning(f"โš ๏ธ Could not apply custom base URL cleanly: {e}") client.login(handle, password) return client def default_state(): return { "version": 1, "posted_tweets": {}, "posted_by_bsky_uri": {}, "updated_at": None, } def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): logging.info(f"๐Ÿง  No state file found at {state_path}. Starting with empty memory.") return default_state() try: with open(state_path, "r", encoding="utf-8") as f: state = json.load(f) if not isinstance(state, dict): logging.warning("โš ๏ธ State file is invalid. Reinitializing.") return default_state() state.setdefault("version", 1) state.setdefault("posted_tweets", {}) state.setdefault("posted_by_bsky_uri", {}) state.setdefault("updated_at", None) return state except Exception as e: logging.warning(f"โš ๏ธ Could not load state file {state_path}: {e}. Reinitializing.") return default_state() def save_state(state, state_path=STATE_PATH): try: state["updated_at"] = arrow.utcnow().isoformat() temp_path = f"{state_path}.tmp" with open(temp_path, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(temp_path, state_path) logging.info(f"๐Ÿ’พ State saved to {state_path}") except Exception as e: logging.error(f"โŒ Failed to save state file {state_path}: {e}") def remember_posted_tweet(state, candidate, bsky_uri=None): canonical_tweet_url = candidate.get("canonical_tweet_url") fallback_key = f"textmedia:{candidate['text_media_key']}" state_key = canonical_tweet_url or fallback_key record = { "canonical_tweet_url": canonical_tweet_url, "normalized_text": candidate["normalized_text"], "raw_text": candidate["raw_text"], "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), "ordered_non_x_urls": candidate.get("ordered_non_x_urls", []), "bsky_uri": bsky_uri, "tweet_created_on": candidate["tweet"].created_on, "tweet_url": candidate["tweet"].tweet_url, "posted_at": arrow.utcnow().isoformat(), } state["posted_tweets"][state_key] = record if bsky_uri: state["posted_by_bsky_uri"][bsky_uri] = state_key def candidate_matches_state(candidate, state): canonical_tweet_url = candidate["canonical_tweet_url"] text_media_key = candidate["text_media_key"] normalized_text = candidate["normalized_text"] posted_tweets = state.get("posted_tweets", {}) if canonical_tweet_url and canonical_tweet_url in posted_tweets: return True, "state:tweet_url" for _, record in posted_tweets.items(): if record.get("text_media_key") == text_media_key: return True, "state:text_media_fingerprint" for _, record in posted_tweets.items(): if record.get("normalized_text") == normalized_text: return True, "state:normalized_text" return False, None def prune_state(state, max_entries=5000): posted_tweets = state.get("posted_tweets", {}) if len(posted_tweets) <= max_entries: return state sortable = [] for key, record in posted_tweets.items(): posted_at = record.get("posted_at") or "" sortable.append((key, posted_at)) sortable.sort(key=lambda x: x[1], reverse=True) keep_keys = {key for key, _ in sortable[:max_entries]} new_posted_tweets = {} for key, record in posted_tweets.items(): if key in keep_keys: new_posted_tweets[key] = record new_posted_by_bsky_uri = {} for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items(): if key in keep_keys: new_posted_by_bsky_uri[bsky_uri] = key state["posted_tweets"] = new_posted_tweets state["posted_by_bsky_uri"] = new_posted_by_bsky_uri return state def get_recent_bsky_posts(client, handle, limit=30): recent_posts = [] try: timeline = client.get_author_feed(handle, limit=limit) for item in timeline.feed: try: if item.reason is not None: continue record = item.post.record if getattr(record, "reply", None) is not None: continue text = getattr(record, "text", "") or "" normalized_text = normalize_post_text(text) urls = [] urls.extend(extract_non_x_urls_from_text(text)) urls.extend(extract_urls_from_facets(record)) canonical_non_x_urls = set() for url in urls: if not is_x_or_twitter_domain(url): canonical = canonicalize_url(url) if canonical: canonical_non_x_urls.add(canonical) media_fingerprint = build_bsky_media_fingerprint(item.post) text_media_key = build_text_media_key(normalized_text, media_fingerprint) recent_posts.append({ "uri": getattr(item.post, "uri", None), "text": text, "normalized_text": normalized_text, "canonical_non_x_urls": canonical_non_x_urls, "media_fingerprint": media_fingerprint, "text_media_key": text_media_key, "created_at": getattr(record, "created_at", None), }) except Exception as e: logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") except Exception as e: logging.warning(f"โš ๏ธ Could not fetch recent Bluesky posts for duplicate detection: {e}") return recent_posts def make_rich(content): text_builder = client_utils.TextBuilder() content = repair_broken_urls(content.strip()) content = repair_broken_mentions(content) content = strip_line_edge_whitespace(content) content = remove_trailing_ellipsis_line(content) lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): if word.startswith("http://"): word = word.replace("http://", "https://", 1) word = strip_trailing_url_punctuation(word) clean_url_value = clean_url(word) if clean_url_value and is_valid_url(clean_url_value): text_builder.link(clean_url_value, clean_url_value) else: text_builder.text(word) elif word.startswith("#"): clean_tag = word[1:].rstrip(".,;:!?)'\"โ€ฆ") text_builder.tag(word, clean_tag) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder def build_dynamic_alt(raw_text): dynamic_alt = repair_broken_urls(raw_text) dynamic_alt = repair_broken_mentions(dynamic_alt) dynamic_alt = strip_line_edge_whitespace(dynamic_alt) dynamic_alt = remove_trailing_ellipsis_line(dynamic_alt) dynamic_alt = dynamic_alt.replace("\n", " ").strip() dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip() if len(dynamic_alt) > 150: dynamic_alt = dynamic_alt[:147] + "..." elif not dynamic_alt: dynamic_alt = "Attached video or image from tweet" return dynamic_alt def build_video_embed(video_blob, alt_text): try: return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) except AttributeError: logging.error("โŒ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.") return None def scrape_tweets_via_playwright(username, password, email, target_handle): tweets = [] state_file = "twitter_browser_state.json" with sync_playwright() as p: browser = p.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"] ) clean_ua = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ) context = None needs_login = True if os.path.exists(state_file): logging.info("โœ… Found existing browser state. Attempting to bypass login...") context = browser.new_context( user_agent=clean_ua, viewport={"width": 1920, "height": 1080}, storage_state=state_file ) page = context.new_page() page.goto("https://x.com/home") time.sleep(4) if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url: logging.info("โœ… Session is valid!") needs_login = False else: logging.warning("โš ๏ธ Saved session expired or invalid. Re-logging in...") context.close() os.remove(state_file) if needs_login: logging.info("๐Ÿš€ Launching fresh browser for automated Twitter login...") context = browser.new_context( user_agent=clean_ua, viewport={"width": 1920, "height": 1080} ) page = context.new_page() try: page.goto("https://x.com") sign_in_button = page.get_by_text("Sign in", exact=True) sign_in_button.wait_for(state="visible", timeout=15000) sign_in_button.click(force=True) page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000) logging.info(f"๐Ÿ‘ค Entering username: {username}...") time.sleep(1) username_input = page.locator('input[autocomplete="username"]') username_input.wait_for(state="visible", timeout=15000) username_input.click(force=True) username_input.press_sequentially(username, delay=100) page.locator('button:has-text("Next")').first.click(force=True) page.wait_for_selector( 'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]', timeout=15000 ) time.sleep(1) if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible(): logging.warning("๐Ÿ›ก๏ธ Security challenge detected! Entering email/phone...") page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email) sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first if sec_next.is_visible(): sec_next.click(force=True) else: page.keyboard.press("Enter") page.wait_for_selector('input[name="password"]', timeout=15000) time.sleep(1) logging.info("๐Ÿ”‘ Entering password...") page.fill('input[name="password"]', password) page.locator('span:has-text("Log in")').first.click() page.wait_for_url("**/home", timeout=20000) time.sleep(3) context.storage_state(path=state_file) logging.info("โœ… Login successful. Browser state saved.") except Exception as e: take_error_screenshot(page, "login_failed") logging.error(f"โŒ Login failed: {e}") browser.close() return [] logging.info(f"๐ŸŒ Navigating to https://x.com/{target_handle} to scrape tweets...") page = context.new_page() page.goto(f"https://x.com/{target_handle}") try: page.wait_for_selector("article", timeout=20000) time.sleep(3) articles = page.locator("article").all() logging.info(f"๐Ÿ“Š Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...") for article in articles[:SCRAPE_TWEET_LIMIT]: try: time_el = article.locator("time").first if not time_el.is_visible(): continue created_at = time_el.get_attribute("datetime") tweet_url = None time_link = article.locator("a:has(time)").first if time_link.is_visible(): href = time_link.get_attribute("href") if href: tweet_url = f"https://x.com{href}" if href.startswith("/") else href text_locator = article.locator('[data-testid="tweetText"]').first text = text_locator.inner_text() if text_locator.is_visible() else "" media_urls = [] photo_locators = article.locator('[data-testid="tweetPhoto"] img').all() for img in photo_locators: src = img.get_attribute("src") if src: src = re.sub(r"&name=\w+", "&name=large", src) media_urls.append((src, "photo")) video_locators = article.locator('[data-testid="videoPlayer"]').all() if video_locators: media_urls.append((tweet_url or "", "video")) tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url)) except Exception as e: logging.warning(f"โš ๏ธ Failed to parse a specific tweet: {e}") continue except Exception as e: take_error_screenshot(page, "scrape_failed") logging.error(f"โŒ Failed to scrape profile: {e}") browser.close() return tweets def extract_video_url_from_tweet_page(context, tweet_url): page = context.new_page() best_m3u8_url = None best_video_mp4_url = None seen_urls = set() def is_audio_only_mp4(url, content_type): url_l = url.lower() content_type_l = content_type.lower() return ( "/aud/" in url_l or "/audio/" in url_l or "mp4a" in url_l or ("audio/" in content_type_l and "video/" not in content_type_l) ) def handle_response(response): nonlocal best_m3u8_url, best_video_mp4_url try: url = response.url if url in seen_urls: return seen_urls.add(url) url_l = url.lower() content_type = response.headers.get("content-type", "") content_type_l = content_type.lower() if ".m4s" in url_l: return if ( ".m3u8" in url_l or "application/vnd.apple.mpegurl" in content_type_l or "application/x-mpegurl" in content_type_l ): if best_m3u8_url is None: best_m3u8_url = url logging.info(f"๐Ÿ“บ Found HLS playlist URL: {url}") return if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l: if is_audio_only_mp4(url, content_type): logging.info(f"๐Ÿ”‡ Ignoring audio-only MP4: {url}") return if best_video_mp4_url is None: best_video_mp4_url = url logging.info(f"๐ŸŽฅ Found VIDEO MP4 URL: {url}") return except Exception as e: logging.debug(f"Response parsing error: {e}") page.on("response", handle_response) def current_best(): return best_m3u8_url or best_video_mp4_url try: logging.info(f"๐ŸŽฌ Opening tweet page to capture video URL: {tweet_url}") page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) time.sleep(3) player = page.locator('[data-testid="videoPlayer"]').first if player.count() > 0: try: player.scroll_into_view_if_needed(timeout=5000) except Exception: pass try: player.click(force=True, timeout=5000) logging.info("โ–ถ๏ธ Clicked video player") except Exception as e: logging.info(f"โš ๏ธ First player click failed: {e}") else: logging.warning("โš ๏ธ No video player locator found on tweet page") for _ in range(12): if current_best(): break time.sleep(1) if not current_best() and player.count() > 0: logging.info("๐Ÿ” No media URL found yet, retrying player interaction...") try: player.click(force=True, timeout=5000) time.sleep(2) except Exception as e: logging.info(f"โš ๏ธ Retry click failed: {e}") try: page.keyboard.press("Space") time.sleep(1) except Exception: pass for _ in range(8): if current_best(): break time.sleep(1) selected_url = current_best() if selected_url: logging.info(f"โœ… Selected media URL for download: {selected_url}") else: logging.warning(f"โš ๏ธ No playable media URL detected on tweet page: {tweet_url}") return selected_url except Exception as e: logging.warning(f"โš ๏ธ Could not extract video URL from tweet page {tweet_url}: {e}") return None finally: page.close() def download_and_crop_video(video_url, output_path): temp_input = output_path.replace(".mp4", "_source.mp4") temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") temp_output = output_path.replace(".mp4", "_compressed.mp4") try: logging.info(f"โฌ‡๏ธ Downloading video source with ffmpeg: {video_url}") video_url_l = video_url.lower() if ".m3u8" in video_url_l: logging.info("๐Ÿ“บ Using HLS ffmpeg mode") download_cmd = [ "ffmpeg", "-y", "-protocol_whitelist", "file,http,https,tcp,tls,crypto", "-allowed_extensions", "ALL", "-i", video_url, "-c", "copy", temp_input, ] else: logging.info("๐ŸŽฅ Using direct MP4 ffmpeg mode") download_cmd = [ "ffmpeg", "-y", "-i", video_url, "-c", "copy", temp_input, ] download_result = subprocess.run(download_cmd, capture_output=True, text=True) if download_result.returncode != 0: logging.error(f"โŒ ffmpeg download failed:\n{download_result.stderr}") return None if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0: logging.error("โŒ Downloaded video source file is missing or empty.") return None logging.info(f"โœ… Video downloaded: {temp_input}") video_clip = VideoFileClip(temp_input) duration = float(video_clip.duration) if video_clip.duration else 0 if duration <= 0: video_clip.close() logging.error("โŒ Downloaded video has invalid or unknown duration.") return None end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) if hasattr(video_clip, "subclipped"): cropped_clip = video_clip.subclipped(0, end_time) else: cropped_clip = video_clip.subclip(0, end_time) cropped_clip.write_videofile( temp_trimmed, codec="libx264", audio_codec="aac", preset="veryfast", bitrate="1800k", audio_bitrate="128k", logger=None ) video_clip.close() cropped_clip.close() if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0: logging.error("โŒ Trimmed video output is missing or empty.") return None trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024) logging.info(f"๐Ÿ“ฆ Trimmed video size before compression: {trimmed_size_mb:.2f} MB") compress_cmd = [ "ffmpeg", "-y", "-i", temp_trimmed, "-vf", "scale='min(720,iw)':-2", "-c:v", "libx264", "-preset", "veryfast", "-crf", "30", "-maxrate", "1800k", "-bufsize", "3600k", "-c:a", "aac", "-b:a", "128k", "-movflags", "+faststart", temp_output, ] compress_result = subprocess.run(compress_cmd, capture_output=True, text=True) if compress_result.returncode != 0: logging.error(f"โŒ ffmpeg compression failed:\n{compress_result.stderr}") return None if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0: logging.error("โŒ Compressed video output is missing or empty.") return None final_size_mb = os.path.getsize(temp_output) / (1024 * 1024) logging.info(f"โœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)") os.replace(temp_output, output_path) logging.info(f"โœ… Final video ready: {output_path}") return output_path except Exception as e: logging.error(f"โŒ Error processing video: {repr(e)}") return None finally: for path in [temp_input, temp_trimmed, temp_output]: if os.path.exists(path): try: os.remove(path) except Exception: pass def candidate_matches_existing_bsky(candidate, recent_bsky_posts): candidate_non_x_urls = candidate["canonical_non_x_urls"] candidate_text_media_key = candidate["text_media_key"] candidate_normalized_text = candidate["normalized_text"] for existing in recent_bsky_posts: existing_non_x_urls = existing["canonical_non_x_urls"] if ( candidate_non_x_urls and candidate_non_x_urls == existing_non_x_urls and candidate_normalized_text == existing["normalized_text"] ): return True, "bsky:normalized_text_plus_non_x_urls" if candidate_text_media_key == existing["text_media_key"]: return True, "bsky:text_media_fingerprint" if candidate_normalized_text == existing["normalized_text"]: return True, "bsky:normalized_text" return False, None def sync_feeds(args): logging.info("๐Ÿ”„ Starting sync cycle...") try: state = load_state(STATE_PATH) tweets = scrape_tweets_via_playwright( args.twitter_username, args.twitter_password, args.twitter_email, args.twitter_handle ) if not tweets: logging.warning("โš ๏ธ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.") return bsky_client = create_bsky_client( args.bsky_base_url, args.bsky_handle, args.bsky_password ) recent_bsky_posts = get_recent_bsky_posts( bsky_client, args.bsky_handle, limit=DEDUPE_BSKY_LIMIT ) logging.info(f"๐Ÿง  Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") logging.info(f"๐Ÿง  Local state currently tracks {len(state.get('posted_tweets', {}))} posted items.") too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) logging.info(f"๐Ÿ•’ Will ignore tweets older than: {too_old_cutoff}") candidate_tweets = [] for tweet in reversed(tweets): try: tweet_time = arrow.get(tweet.created_on) if tweet_time < too_old_cutoff: logging.info(f"โญ๏ธ Skipping old tweet from {tweet_time}") continue prepared_text = prepare_post_text(tweet.text) normalized_text = normalize_post_text(prepared_text) if not normalized_text: logging.info(f"โญ๏ธ Skipping empty/blank tweet from {tweet_time}") continue media_fingerprint = build_media_fingerprint(tweet) text_media_key = build_text_media_key(normalized_text, media_fingerprint) ordered_non_x_urls = extract_ordered_non_x_urls(prepared_text) canonical_non_x_urls = set(ordered_non_x_urls) candidate_tweets.append({ "tweet": tweet, "tweet_time": tweet_time, "raw_text": prepared_text, "normalized_text": normalized_text, "media_fingerprint": media_fingerprint, "text_media_key": text_media_key, "canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url), "canonical_non_x_urls": canonical_non_x_urls, "ordered_non_x_urls": ordered_non_x_urls, "looks_like_title_plus_url": looks_like_title_plus_url_post(prepared_text), }) except Exception as e: logging.warning(f"โš ๏ธ Failed to prepare candidate tweet: {e}") logging.info(f"๐Ÿงช Prepared {len(candidate_tweets)} candidate tweets for duplicate comparison.") tweets_to_post = [] for candidate in candidate_tweets: is_dup_state, reason_state = candidate_matches_state(candidate, state) if is_dup_state: logging.info(f"โญ๏ธ Skipping candidate due to local state duplicate match on: {reason_state}") continue is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) if is_dup_bsky: logging.info(f"โญ๏ธ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") continue tweets_to_post.append(candidate) logging.info(f"๐Ÿ“ฌ {len(tweets_to_post)} tweets remain after duplicate filtering.") if not tweets_to_post: logging.info("โœ… No new tweets need posting after duplicate comparison.") return new_posts = 0 browser_state_file = "twitter_browser_state.json" with sync_playwright() as p, httpx.Client() as media_http_client: browser = p.chromium.launch( headless=True, args=["--disable-blink-features=AutomationControlled"] ) context_kwargs = { "user_agent": ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/145.0.7632.6 Safari/537.36" ), "viewport": {"width": 1920, "height": 1080}, } if os.path.exists(browser_state_file): context_kwargs["storage_state"] = browser_state_file context = browser.new_context(**context_kwargs) for candidate in tweets_to_post: tweet = candidate["tweet"] tweet_time = candidate["tweet_time"] raw_text = candidate["raw_text"] logging.info(f"๐Ÿ“ Posting missing tweet from {tweet_time} to Bluesky...") rich_text = make_rich(raw_text) dynamic_alt = build_dynamic_alt(raw_text) image_embeds = [] video_embed = None external_embed = None media_upload_failures = [] if tweet.media: for media in tweet.media: if media.type == "photo": blob = get_blob_from_url(media.media_url_https, bsky_client, media_http_client) if blob: image_embeds.append( models.AppBskyEmbedImages.Image( alt=dynamic_alt, image=blob ) ) else: media_upload_failures.append(f"photo:{media.media_url_https}") elif media.type == "video": if not tweet.tweet_url: logging.warning("โš ๏ธ Tweet has video marker but no tweet URL. Skipping video.") media_upload_failures.append("video:no_tweet_url") continue temp_video_path = "temp_video.mp4" try: real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url) if not real_video_url: logging.warning(f"โš ๏ธ Could not resolve playable video URL for {tweet.tweet_url}") media_upload_failures.append(f"video:resolve_failed:{tweet.tweet_url}") continue cropped_video_path = download_and_crop_video(real_video_url, temp_video_path) if not cropped_video_path: logging.warning(f"โš ๏ธ Video download/crop failed for {tweet.tweet_url}") media_upload_failures.append(f"video:crop_failed:{tweet.tweet_url}") continue video_blob = get_blob_from_file(cropped_video_path, bsky_client) if not video_blob: logging.warning(f"โš ๏ธ Video upload blob failed for {tweet.tweet_url}") media_upload_failures.append(f"video:upload_failed:{tweet.tweet_url}") continue video_embed = build_video_embed(video_blob, dynamic_alt) if not video_embed: media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}") finally: if os.path.exists(temp_video_path): os.remove(temp_video_path) if not video_embed and not image_embeds: candidate_url = None if candidate.get("ordered_non_x_urls"): candidate_url = candidate["ordered_non_x_urls"][0] if candidate.get("looks_like_title_plus_url"): logging.info(f"๐Ÿ”— Detected title+URL post style. Using URL for external card: {candidate_url}") else: logging.info(f"๐Ÿ”— Text-only post with non-X URL. Using first URL for external card: {candidate_url}") if candidate_url: external_embed = build_external_link_embed( candidate_url, bsky_client, media_http_client, fallback_title="Link" ) if external_embed: logging.info(f"โœ… Built external link card for URL: {candidate_url}") else: logging.info(f"โ„น๏ธ Could not build external link card metadata for URL: {candidate_url}") try: post_result = None post_mode = "text" if video_embed: post_result = bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"]) post_mode = "video" elif image_embeds: embed = models.AppBskyEmbedImages.Main(images=image_embeds) post_result = bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"]) post_mode = f"images:{len(image_embeds)}" elif external_embed: post_result = bsky_client.send_post(text=rich_text, embed=external_embed, langs=["ca"]) post_mode = "external_link_card" else: post_result = bsky_client.send_post(text=rich_text, langs=["ca"]) post_mode = "text_only" bsky_uri = getattr(post_result, "uri", None) remember_posted_tweet(state, candidate, bsky_uri=bsky_uri) state = prune_state(state, max_entries=5000) save_state(state, STATE_PATH) recent_bsky_posts.insert(0, { "uri": bsky_uri, "text": raw_text, "normalized_text": candidate["normalized_text"], "canonical_non_x_urls": candidate["canonical_non_x_urls"], "media_fingerprint": candidate["media_fingerprint"], "text_media_key": candidate["text_media_key"], "created_at": arrow.utcnow().isoformat(), }) recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] new_posts += 1 if media_upload_failures: logging.warning( f"โœ… Posted tweet to Bluesky with degraded media mode ({post_mode}). " f"Failed media items: {media_upload_failures}" ) else: logging.info(f"โœ… Posted new tweet to Bluesky with mode {post_mode}: {raw_text}") time.sleep(5) except Exception as e: logging.error(f"โŒ Failed to post tweet to Bluesky: {e}") browser.close() logging.info(f"โœ… Sync complete. Posted {new_posts} new updates.") except Exception as e: logging.error(f"โŒ Error during sync cycle: {e}") def main(): load_dotenv() parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") parser.add_argument("--twitter-username", help="Your Twitter login username") parser.add_argument("--twitter-password", help="Your Twitter login password") parser.add_argument("--twitter-email", help="Your Twitter email for security challenges") parser.add_argument("--twitter-handle", help="The Twitter account to scrape") parser.add_argument("--bsky-handle", help="Your Bluesky handle") parser.add_argument("--bsky-password", help="Your Bluesky app password") parser.add_argument("--bsky-base-url", help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social") args = parser.parse_args() args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME") args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD") args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username args.bsky_base_url = args.bsky_base_url if args.bsky_base_url else DEFAULT_BSKY_BASE_URL missing_args = [] if not args.twitter_username: missing_args.append("--twitter-username") if not args.twitter_password: missing_args.append("--twitter-password") if not args.bsky_handle: missing_args.append("--bsky-handle") if not args.bsky_password: missing_args.append("--bsky-password") if missing_args: logging.error(f"โŒ Missing credentials! You forgot to provide: {', '.join(missing_args)}") return logging.info(f"๐Ÿค– Bot started. Will check @{args.twitter_handle}") logging.info(f"๐ŸŒ Posting destination base URL: {args.bsky_base_url}") sync_feeds(args) logging.info("๐Ÿค– Bot finished.") if __name__ == "__main__": main()