Files
post2bsky/twitter2bsky_daemon.py
2026-04-09 16:44:08 +02:00

1784 lines
62 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import arrow
import hashlib
import io
import json
import logging
import re
import httpx
import time
import os
import subprocess
from urllib.parse import urlparse
from dotenv import load_dotenv
from atproto import Client, client_utils, models
from playwright.sync_api import sync_playwright
from moviepy import VideoFileClip
from bs4 import BeautifulSoup
from PIL import Image
# --- Configuration ---
LOG_PATH = "twitter2bsky.log"
STATE_PATH = "twitter2bsky_state.json"
SCRAPE_TWEET_LIMIT = 30
DEDUPE_BSKY_LIMIT = 30
TWEET_MAX_AGE_DAYS = 3
BSKY_TEXT_MAX_LENGTH = 275
VIDEO_MAX_DURATION_SECONDS = 179
MAX_VIDEO_UPLOAD_SIZE_MB = 45
EXTERNAL_THUMB_MAX_BYTES = 950 * 1024
EXTERNAL_THUMB_MAX_DIMENSION = 1200
EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
MEDIA_DOWNLOAD_TIMEOUT = 30
LINK_METADATA_TIMEOUT = 10
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
# --- Logging Setup ---
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()],
level=logging.INFO,
)
# --- Custom Classes ---
class ScrapedMedia:
def __init__(self, url, media_type="photo"):
self.type = media_type
self.media_url_https = url
class ScrapedTweet:
def __init__(self, created_on, text, media_urls, tweet_url=None):
self.created_on = created_on
self.text = text
self.tweet_url = tweet_url
self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls]
# --- Helpers ---
def take_error_screenshot(page, error_msg):
logging.info(f"📸 Taking screenshot... Shot: {error_msg}")
timestamp = time.strftime("%Y%m%d_%H%M%S")
screenshot_name = f"screenshot_{timestamp}.png"
page.screenshot(path=screenshot_name)
logging.info(f"📸 Screenshot saved as: {screenshot_name}")
def is_valid_url(url):
try:
response = httpx.head(url, timeout=5, follow_redirects=True)
return response.status_code < 500
except Exception:
return False
def strip_trailing_url_punctuation(url):
if not url:
return url
return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url.strip())
def repair_broken_urls(text):
if not text:
return text
original = text
text = re.sub(r"(https?://)\s*[\r\n]+\s*", r"\1", text, flags=re.IGNORECASE)
prev_text = None
while prev_text != text:
prev_text = text
text = re.sub(
r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)",
r"\1\2",
text,
flags=re.IGNORECASE
)
text = re.sub(
r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)",
r"\1\2",
text,
flags=re.IGNORECASE
)
if text != original:
logging.info("🔧 Repaired broken URL wrapping in scraped text")
return text
def repair_broken_mentions(text):
if not text:
return text
lines = text.splitlines()
result = []
i = 0
changed = False
def is_mention_only_line(s):
return bool(re.fullmatch(r"@[A-Za-z0-9_]+", s.strip()))
def is_blank_line(s):
return not s.strip()
while i < len(lines):
current = lines[i]
stripped = current.strip()
if is_blank_line(current):
result.append("")
i += 1
continue
if is_mention_only_line(current):
if result and result[-1].strip():
result[-1] = result[-1].rstrip() + " " + stripped
changed = True
else:
result.append(stripped)
i += 1
while i < len(lines):
next_line = lines[i]
next_stripped = next_line.strip()
if is_blank_line(next_line):
break
if is_mention_only_line(next_line):
break
result[-1] = result[-1].rstrip() + " " + next_stripped
changed = True
i += 1
if i < len(lines) and is_blank_line(lines[i]):
break
continue
if i + 1 < len(lines) and is_mention_only_line(lines[i + 1]):
merged = stripped + " " + lines[i + 1].strip()
changed = True
i += 2
while i < len(lines):
next_line = lines[i]
next_stripped = next_line.strip()
if is_blank_line(next_line):
break
if is_mention_only_line(next_line):
break
merged = merged.rstrip() + " " + next_stripped
changed = True
i += 1
if i < len(lines) and is_blank_line(lines[i]):
break
result.append(merged)
continue
result.append(stripped)
i += 1
new_text = "\n".join(result)
if changed:
logging.info("🔧 Repaired broken mention wrapping in scraped text")
return new_text
def strip_line_edge_whitespace(text):
if not text:
return text
lines = text.splitlines()
cleaned_lines = []
changed = False
for line in lines:
cleaned = line.strip()
if cleaned != line:
changed = True
cleaned_lines.append(cleaned)
new_text = "\n".join(cleaned_lines)
if changed:
logging.info("🔧 Stripped leading/trailing whitespace from scraped text lines")
return new_text
def remove_trailing_ellipsis_line(text):
if not text:
return text
lines = text.splitlines()
while lines and lines[-1].strip() in {"...", ""}:
lines.pop()
return "\n".join(lines).strip()
def clean_post_text(text):
raw_text = (text or "").strip()
raw_text = repair_broken_urls(raw_text)
raw_text = repair_broken_mentions(raw_text)
raw_text = strip_line_edge_whitespace(raw_text)
raw_text = remove_trailing_ellipsis_line(raw_text)
return raw_text.strip()
def clean_url(url):
trimmed_url = url.strip()
cleaned_url = re.sub(r"\s+", "", trimmed_url)
cleaned_url = strip_trailing_url_punctuation(cleaned_url)
if is_valid_url(cleaned_url):
return cleaned_url
return None
def canonicalize_url(url):
if not url:
return None
return strip_trailing_url_punctuation(url.strip())
def canonicalize_tweet_url(url):
if not url:
return None
url = url.strip()
match = re.search(r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", url, re.IGNORECASE)
if not match:
return url.lower()
handle = match.group(1).lower()
tweet_id = match.group(2)
return f"https://x.com/{handle}/status/{tweet_id}"
def is_x_or_twitter_domain(url):
try:
hostname = (urlparse(url).hostname or "").lower()
return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"}
except Exception:
return False
def extract_urls_from_text(text):
if not text:
return []
repaired = repair_broken_urls(text)
return re.findall(r"https?://[^\s]+", repaired)
def extract_non_x_urls_from_text(text):
urls = extract_urls_from_text(text)
result = []
for url in urls:
cleaned = strip_trailing_url_punctuation(url)
if cleaned and not is_x_or_twitter_domain(cleaned):
result.append(cleaned)
return result
def extract_ordered_non_x_urls(text):
seen = set()
ordered = []
for url in extract_non_x_urls_from_text(text):
canonical = canonicalize_url(url)
if canonical and canonical not in seen:
seen.add(canonical)
ordered.append(canonical)
return ordered
def looks_like_title_plus_url_post(text):
if not text:
return False
repaired = repair_broken_urls(text)
repaired = strip_line_edge_whitespace(repaired)
lines = [line.strip() for line in repaired.splitlines() if line.strip()]
if len(lines) < 2:
return False
last_line = lines[-1]
urls_in_last_line = extract_ordered_non_x_urls(last_line)
total_urls = extract_ordered_non_x_urls(repaired)
return len(urls_in_last_line) == 1 and len(total_urls) == 1 and last_line.startswith(("http://", "https://"))
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
if len(text) <= max_length:
return text
truncated = text[:max_length - 3]
last_space = truncated.rfind(" ")
if last_space > 0:
return truncated[:last_space] + "..."
return truncated + "..."
def prepare_post_text_for_bsky(full_clean_text, keep_url=None):
text = (full_clean_text or "").strip()
if not text:
return text
if len(text) <= BSKY_TEXT_MAX_LENGTH:
return text
if keep_url:
canonical_keep = canonicalize_url(keep_url)
urls = extract_ordered_non_x_urls(text)
matched_url = None
for url in urls:
if canonicalize_url(url) == canonical_keep:
matched_url = url
break
if matched_url and matched_url in text:
idx = text.find(matched_url)
prefix = text[:idx].rstrip()
suffix = matched_url
reserve = len(suffix) + 1
available = BSKY_TEXT_MAX_LENGTH - reserve
if available > 10:
trimmed_prefix = prefix
if len(trimmed_prefix) > available:
trimmed_prefix = trimmed_prefix[:available - 3]
last_space = trimmed_prefix.rfind(" ")
if last_space > 0:
trimmed_prefix = trimmed_prefix[:last_space] + "..."
else:
trimmed_prefix = trimmed_prefix + "..."
final_text = f"{trimmed_prefix.rstrip()} {suffix}".strip()
if len(final_text) <= BSKY_TEXT_MAX_LENGTH:
logging.info("🔗 Preserved non-X URL in final Bluesky text for card generation")
return final_text
return truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH)
def normalize_post_text(text):
if not text:
return ""
text = clean_post_text(text)
text = text.replace("\r", "\n")
text = re.sub(r"\s+", " ", text).strip()
return text.lower()
def build_media_fingerprint(tweet):
if not tweet or not tweet.media:
return "no-media"
parts = []
for media in tweet.media:
media_type = getattr(media, "type", "unknown")
media_url = getattr(media, "media_url_https", "") or ""
stable_value = media_url
if media_type == "photo":
stable_value = re.sub(r"[?&]name=\w+", "", stable_value)
stable_value = re.sub(r"[?&]format=\w+", "", stable_value)
elif media_type == "video":
stable_value = canonicalize_tweet_url(tweet.tweet_url or media_url or "")
parts.append(f"{media_type}:{stable_value}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
images = getattr(embed, "images", None)
if images:
for img in images:
image_obj = getattr(img, "image", None)
ref = getattr(image_obj, "ref", None) or getattr(image_obj, "cid", None) or str(image_obj)
parts.append(f"photo:{ref}")
video = getattr(embed, "video", None)
if video:
ref = getattr(video, "ref", None) or getattr(video, "cid", None) or str(video)
parts.append(f"video:{ref}")
external = getattr(embed, "external", None)
if external:
uri = getattr(external, "uri", None) or str(external)
parts.append(f"external:{uri}")
if not parts:
return "no-media"
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(f"{normalized_text}||{media_fingerprint}".encode("utf-8")).hexdigest()
def create_bsky_client(base_url, handle, password):
normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
logging.info(f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}")
try:
client = Client(base_url=normalized_base_url)
except TypeError:
logging.warning("⚠️ Your atproto Client does not accept base_url in constructor. Falling back.")
client = Client()
try:
if hasattr(client, "base_url"):
client.base_url = normalized_base_url
elif hasattr(client, "_base_url"):
client._base_url = normalized_base_url
except Exception as e:
logging.warning(f"⚠️ Could not apply custom base URL cleanly: {e}")
client.login(handle, password)
return client
def default_state():
return {
"version": 1,
"posted_tweets": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
logging.info(f"🧠 No state file found at {state_path}. Starting with empty memory.")
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
if not isinstance(state, dict):
logging.warning("⚠️ State file is invalid. Reinitializing.")
return default_state()
state.setdefault("version", 1)
state.setdefault("posted_tweets", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.")
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp_path = f"{state_path}.tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp_path, state_path)
logging.info(f"💾 State saved to {state_path}")
except Exception as e:
logging.error(f"❌ Failed to save state file {state_path}: {e}")
def remember_posted_tweet(state, candidate, bsky_uri=None):
canonical_tweet_url = candidate.get("canonical_tweet_url")
fallback_key = f"textmedia:{candidate['text_media_key']}"
state_key = canonical_tweet_url or fallback_key
record = {
"canonical_tweet_url": canonical_tweet_url,
"normalized_text": candidate["normalized_text"],
"raw_text": candidate["raw_text"],
"full_clean_text": candidate.get("full_clean_text", candidate["raw_text"]),
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]),
"ordered_non_x_urls": candidate.get("ordered_non_x_urls", []),
"bsky_uri": bsky_uri,
"tweet_created_on": candidate["tweet"].created_on,
"tweet_url": candidate["tweet"].tweet_url,
"posted_at": arrow.utcnow().isoformat(),
}
state["posted_tweets"][state_key] = record
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = state_key
def candidate_matches_state(candidate, state):
canonical_tweet_url = candidate["canonical_tweet_url"]
text_media_key = candidate["text_media_key"]
normalized_text = candidate["normalized_text"]
posted_tweets = state.get("posted_tweets", {})
if canonical_tweet_url and canonical_tweet_url in posted_tweets:
return True, "state:tweet_url"
for _, record in posted_tweets.items():
if record.get("text_media_key") == text_media_key:
return True, "state:text_media_fingerprint"
for _, record in posted_tweets.items():
if record.get("normalized_text") == normalized_text:
return True, "state:normalized_text"
return False, None
def prune_state(state, max_entries=5000):
posted_tweets = state.get("posted_tweets", {})
if len(posted_tweets) <= max_entries:
return state
sortable = []
for key, record in posted_tweets.items():
posted_at = record.get("posted_at") or ""
sortable.append((key, posted_at))
sortable.sort(key=lambda x: x[1], reverse=True)
keep_keys = {key for key, _ in sortable[:max_entries]}
new_posted_tweets = {}
for key, record in posted_tweets.items():
if key in keep_keys:
new_posted_tweets[key] = record
new_posted_by_bsky_uri = {}
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
if key in keep_keys:
new_posted_by_bsky_uri[bsky_uri] = key
state["posted_tweets"] = new_posted_tweets
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
return state
def extract_urls_from_facets(record):
urls = []
try:
facets = getattr(record, "facets", None) or []
for facet in facets:
features = getattr(facet, "features", None) or []
for feature in features:
uri = getattr(feature, "uri", None)
if uri:
urls.append(uri)
except Exception as e:
logging.debug(f"Could not extract facet URLs: {e}")
return urls
def get_recent_bsky_posts(client, handle, limit=30):
recent_posts = []
try:
timeline = client.get_author_feed(handle, limit=limit)
for item in timeline.feed:
try:
if item.reason is not None:
continue
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized_text = normalize_post_text(text)
urls = []
urls.extend(extract_non_x_urls_from_text(text))
urls.extend(extract_urls_from_facets(record))
canonical_non_x_urls = set()
for url in urls:
if not is_x_or_twitter_domain(url):
canonical = canonicalize_url(url)
if canonical:
canonical_non_x_urls.add(canonical)
media_fingerprint = build_bsky_media_fingerprint(item.post)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
recent_posts.append({
"uri": getattr(item.post, "uri", None),
"text": text,
"normalized_text": normalized_text,
"canonical_non_x_urls": canonical_non_x_urls,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"created_at": getattr(record, "created_at", None),
})
except Exception as e:
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
except Exception as e:
logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}")
return recent_posts
def get_rate_limit_wait_seconds(error_obj, default_delay):
try:
headers = getattr(error_obj, "headers", None)
if headers:
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
now_ts = int(time.time())
reset_ts = int(reset_value)
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY)
except Exception:
pass
return default_delay
def is_transient_blob_error(error_obj):
error_text = repr(error_obj)
transient_signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
]
return any(signal in error_text for signal in transient_signals)
def upload_blob_with_retry(client, binary_data, media_label="media"):
last_exception = None
transient_attempts = 0
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
try:
result = client.upload_blob(binary_data)
return result.blob
except Exception as e:
last_exception = e
error_text = str(e)
is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text
if is_rate_limited:
backoff_delay = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY
)
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning(
f"⏳ Bluesky blob upload rate-limited for {media_label}. "
f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
else:
logging.warning(
f"❌ Exhausted blob upload retries for {media_label} after rate limiting: {repr(e)}"
)
break
if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
transient_attempts += 1
wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
logging.warning(
f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. "
f"Transient retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
logging.warning(f"Could not upload {media_label}: {repr(e)}")
if hasattr(e, "response") and e.response is not None:
try:
logging.warning(f"Upload response status: {e.response.status_code}")
logging.warning(f"Upload response body: {e.response.text}")
except Exception:
pass
return None
logging.warning(f"Could not upload {media_label}: {repr(last_exception)}")
return None
def get_blob_from_url(media_url, client, http_client):
try:
r = http_client.get(media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True)
if r.status_code != 200:
logging.warning(f"Could not fetch media {media_url}: HTTP {r.status_code}")
return None
content = r.content
if not content:
logging.warning(f"Could not fetch media {media_url}: empty response body")
return None
return upload_blob_with_retry(client, content, media_label=media_url)
except Exception as e:
logging.warning(f"Could not fetch media {media_url}: {repr(e)}")
return None
def get_blob_from_file(file_path, client):
try:
if not os.path.exists(file_path):
logging.warning(f"Could not upload local file {file_path}: file does not exist")
return None
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
logging.info(f"📦 Uploading local file {file_path} ({file_size_mb:.2f} MB)")
if file_path.lower().endswith(".mp4") and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB:
logging.warning(
f"Could not upload local file {file_path}: "
f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)"
)
return None
with open(file_path, "rb") as f:
binary_data = f.read()
return upload_blob_with_retry(client, binary_data, media_label=file_path)
except Exception as e:
logging.warning(f"Could not upload local file {file_path}: {repr(e)}")
if hasattr(e, "response") and e.response is not None:
try:
logging.warning(f"Upload response status: {e.response.status_code}")
logging.warning(f"Upload response body: {e.response.text}")
except Exception:
pass
return None
def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES):
try:
with Image.open(io.BytesIO(image_bytes)) as img:
img = img.convert("RGB")
width, height = img.size
max_dim = max(width, height)
if max_dim > EXTERNAL_THUMB_MAX_DIMENSION:
scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim
new_size = (max(1, int(width * scale)), max(1, int(height * scale)))
img = img.resize(new_size, Image.LANCZOS)
logging.info(f"🖼️ Resized external thumb to {new_size[0]}x{new_size[1]}")
for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
out = io.BytesIO()
img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True)
data = out.getvalue()
logging.info(
f"🖼️ External thumb candidate size at JPEG quality {quality}: "
f"{len(data) / 1024:.2f} KB"
)
if len(data) <= max_bytes:
return data
for target_dim in [1000, 900, 800, 700, 600]:
resized = img.copy()
width, height = resized.size
max_dim = max(width, height)
if max_dim > target_dim:
scale = target_dim / max_dim
new_size = (max(1, int(width * scale)), max(1, int(height * scale)))
resized = resized.resize(new_size, Image.LANCZOS)
for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
out = io.BytesIO()
resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True)
data = out.getvalue()
logging.info(
f"🖼️ External thumb resized to <= {target_dim}px at quality {quality}: "
f"{len(data) / 1024:.2f} KB"
)
if len(data) <= max_bytes:
return data
except Exception as e:
logging.warning(f"Could not compress external thumbnail: {repr(e)}")
return None
def get_external_thumb_blob_from_url(image_url, client, http_client):
try:
r = http_client.get(image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True)
if r.status_code != 200:
logging.warning(f"Could not fetch external thumb {image_url}: HTTP {r.status_code}")
return None
content = r.content
if not content:
logging.warning(f"Could not fetch external thumb {image_url}: empty body")
return None
original_size_kb = len(content) / 1024
logging.info(f"🖼️ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)")
upload_bytes = content
if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES:
logging.info(
f"🖼️ External thumb exceeds safe limit "
f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..."
)
compressed = compress_external_thumb_to_limit(upload_bytes, EXTERNAL_THUMB_MAX_BYTES)
if compressed:
upload_bytes = compressed
logging.info(f"✅ External thumb compressed to {len(upload_bytes) / 1024:.2f} KB")
else:
logging.warning("⚠️ Could not compress external thumb to fit limit. Will omit thumbnail.")
return None
else:
logging.info("✅ External thumb already within safe size limit.")
blob = upload_blob_with_retry(client, upload_bytes, media_label=f"external-thumb:{image_url}")
if blob:
return blob
logging.warning("⚠️ External thumb upload failed. Will omit thumbnail.")
return None
except Exception as e:
logging.warning(f"Could not fetch/upload external thumb {image_url}: {repr(e)}")
return None
def fetch_link_metadata(url, http_client):
try:
r = http_client.get(url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
title = (soup.find("meta", property="og:title") or soup.find("title"))
desc = (
soup.find("meta", property="og:description")
or soup.find("meta", attrs={"name": "description"})
)
image = (
soup.find("meta", property="og:image")
or soup.find("meta", attrs={"name": "twitter:image"})
)
return {
"title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""),
"description": desc["content"] if desc and desc.has_attr("content") else "",
"image": image["content"] if image and image.has_attr("content") else None,
}
except Exception as e:
logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}")
return {}
def build_external_link_embed(url, client, http_client, fallback_title="Link"):
link_metadata = fetch_link_metadata(url, http_client)
thumb_blob = None
if link_metadata.get("image"):
thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client)
if thumb_blob:
logging.info("✅ External link card thumbnail prepared successfully")
else:
logging.info(" External link card will be posted without thumbnail")
if link_metadata.get("title") or link_metadata.get("description") or thumb_blob:
return models.AppBskyEmbedExternal.Main(
external=models.AppBskyEmbedExternal.External(
uri=url,
title=link_metadata.get("title") or fallback_title,
description=link_metadata.get("description") or "",
thumb=thumb_blob,
)
)
return None
def make_rich(content):
text_builder = client_utils.TextBuilder()
content = clean_post_text(content)
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("http://") or word.startswith("https://"):
if word.startswith("http://"):
word = word.replace("http://", "https://", 1)
word = strip_trailing_url_punctuation(word)
clean_url_value = clean_url(word)
if clean_url_value and is_valid_url(clean_url_value):
text_builder.link(clean_url_value, clean_url_value)
else:
text_builder.text(word)
elif word.startswith("#"):
clean_tag = word[1:].rstrip(".,;:!?)'\"")
text_builder.tag(word, clean_tag)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
def build_dynamic_alt(raw_text):
dynamic_alt = clean_post_text(raw_text)
dynamic_alt = dynamic_alt.replace("\n", " ").strip()
dynamic_alt = re.sub(r"https?://\S+", "", dynamic_alt).strip()
if len(dynamic_alt) > 150:
dynamic_alt = dynamic_alt[:147] + "..."
elif not dynamic_alt:
dynamic_alt = "Attached video or image from tweet"
return dynamic_alt
def build_video_embed(video_blob, alt_text):
try:
return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text)
except AttributeError:
logging.error("❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto.")
return None
def scrape_tweets_via_playwright(username, password, email, target_handle):
tweets = []
state_file = "twitter_browser_state.json"
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"]
)
clean_ua = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
)
context = None
needs_login = True
if os.path.exists(state_file):
logging.info("✅ Found existing browser state. Attempting to bypass login...")
context = browser.new_context(
user_agent=clean_ua,
viewport={"width": 1920, "height": 1080},
storage_state=state_file
)
page = context.new_page()
page.goto("https://x.com/home")
time.sleep(4)
if page.locator('[data-testid="SideNav_NewTweet_Button"]').is_visible() or "/home" in page.url:
logging.info("✅ Session is valid!")
needs_login = False
else:
logging.warning("⚠️ Saved session expired or invalid. Re-logging in...")
context.close()
os.remove(state_file)
if needs_login:
logging.info("🚀 Launching fresh browser for automated Twitter login...")
context = browser.new_context(
user_agent=clean_ua,
viewport={"width": 1920, "height": 1080}
)
page = context.new_page()
try:
page.goto("https://x.com")
sign_in_button = page.get_by_text("Sign in", exact=True)
sign_in_button.wait_for(state="visible", timeout=15000)
sign_in_button.click(force=True)
page.wait_for_selector('h1:has-text("Sign in to X")', state="visible", timeout=25000)
logging.info(f"👤 Entering username: {username}...")
time.sleep(1)
username_input = page.locator('input[autocomplete="username"]')
username_input.wait_for(state="visible", timeout=15000)
username_input.click(force=True)
username_input.press_sequentially(username, delay=100)
page.locator('button:has-text("Next")').first.click(force=True)
page.wait_for_selector(
'input[name="password"], input[data-testid="ocfEnterTextTextInput"], input[name="text"]',
timeout=15000
)
time.sleep(1)
if page.locator('input[data-testid="ocfEnterTextTextInput"]').is_visible() or page.locator('input[name="text"]').is_visible():
logging.warning("🛡️ Security challenge detected! Entering email/phone...")
page.fill('input[data-testid="ocfEnterTextTextInput"], input[name="text"]', email)
sec_next = page.locator('[data-testid="ocfEnterTextNextButton"], span:has-text("Next")').first
if sec_next.is_visible():
sec_next.click(force=True)
else:
page.keyboard.press("Enter")
page.wait_for_selector('input[name="password"]', timeout=15000)
time.sleep(1)
logging.info("🔑 Entering password...")
page.fill('input[name="password"]', password)
page.locator('span:has-text("Log in")').first.click()
page.wait_for_url("**/home", timeout=20000)
time.sleep(3)
context.storage_state(path=state_file)
logging.info("✅ Login successful. Browser state saved.")
except Exception as e:
take_error_screenshot(page, "login_failed")
logging.error(f"❌ Login failed: {e}")
browser.close()
return []
logging.info(f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets...")
page = context.new_page()
page.goto(f"https://x.com/{target_handle}")
try:
page.wait_for_selector("article", timeout=20000)
time.sleep(3)
articles = page.locator("article").all()
logging.info(f"📊 Found {len(articles)} tweets on screen. Parsing up to {SCRAPE_TWEET_LIMIT}...")
for article in articles[:SCRAPE_TWEET_LIMIT]:
try:
time_el = article.locator("time").first
if not time_el.is_visible():
continue
created_at = time_el.get_attribute("datetime")
tweet_url = None
time_link = article.locator("a:has(time)").first
if time_link.is_visible():
href = time_link.get_attribute("href")
if href:
tweet_url = f"https://x.com{href}" if href.startswith("/") else href
text_locator = article.locator('[data-testid="tweetText"]').first
text = text_locator.inner_text() if text_locator.is_visible() else ""
media_urls = []
photo_locators = article.locator('[data-testid="tweetPhoto"] img').all()
for img in photo_locators:
src = img.get_attribute("src")
if src:
src = re.sub(r"&name=\w+", "&name=large", src)
media_urls.append((src, "photo"))
video_locators = article.locator('[data-testid="videoPlayer"]').all()
if video_locators:
media_urls.append((tweet_url or "", "video"))
tweets.append(ScrapedTweet(created_at, text, media_urls, tweet_url=tweet_url))
except Exception as e:
logging.warning(f"⚠️ Failed to parse a specific tweet: {e}")
continue
except Exception as e:
take_error_screenshot(page, "scrape_failed")
logging.error(f"❌ Failed to scrape profile: {e}")
browser.close()
return tweets
def extract_video_url_from_tweet_page(context, tweet_url):
page = context.new_page()
best_m3u8_url = None
best_video_mp4_url = None
seen_urls = set()
def is_audio_only_mp4(url, content_type):
url_l = url.lower()
content_type_l = content_type.lower()
return (
"/aud/" in url_l or
"/audio/" in url_l or
"mp4a" in url_l or
("audio/" in content_type_l and "video/" not in content_type_l)
)
def handle_response(response):
nonlocal best_m3u8_url, best_video_mp4_url
try:
url = response.url
if url in seen_urls:
return
seen_urls.add(url)
url_l = url.lower()
content_type = response.headers.get("content-type", "")
content_type_l = content_type.lower()
if ".m4s" in url_l:
return
if (
".m3u8" in url_l or
"application/vnd.apple.mpegurl" in content_type_l or
"application/x-mpegurl" in content_type_l
):
if best_m3u8_url is None:
best_m3u8_url = url
logging.info(f"📺 Found HLS playlist URL: {url}")
return
if ".mp4" in url_l or "video/mp4" in content_type_l or "audio/mp4" in content_type_l:
if is_audio_only_mp4(url, content_type):
logging.info(f"🔇 Ignoring audio-only MP4: {url}")
return
if best_video_mp4_url is None:
best_video_mp4_url = url
logging.info(f"🎥 Found VIDEO MP4 URL: {url}")
return
except Exception as e:
logging.debug(f"Response parsing error: {e}")
page.on("response", handle_response)
def current_best():
return best_m3u8_url or best_video_mp4_url
try:
logging.info(f"🎬 Opening tweet page to capture video URL: {tweet_url}")
page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(3)
player = page.locator('[data-testid="videoPlayer"]').first
if player.count() > 0:
try:
player.scroll_into_view_if_needed(timeout=5000)
except Exception:
pass
try:
player.click(force=True, timeout=5000)
logging.info("▶️ Clicked video player")
except Exception as e:
logging.info(f"⚠️ First player click failed: {e}")
else:
logging.warning("⚠️ No video player locator found on tweet page")
for _ in range(12):
if current_best():
break
time.sleep(1)
if not current_best() and player.count() > 0:
logging.info("🔁 No media URL found yet, retrying player interaction...")
try:
player.click(force=True, timeout=5000)
time.sleep(2)
except Exception as e:
logging.info(f"⚠️ Retry click failed: {e}")
try:
page.keyboard.press("Space")
time.sleep(1)
except Exception:
pass
for _ in range(8):
if current_best():
break
time.sleep(1)
selected_url = current_best()
if selected_url:
logging.info(f"✅ Selected media URL for download: {selected_url}")
else:
logging.warning(f"⚠️ No playable media URL detected on tweet page: {tweet_url}")
return selected_url
except Exception as e:
logging.warning(f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}")
return None
finally:
page.close()
def download_and_crop_video(video_url, output_path):
temp_input = output_path.replace(".mp4", "_source.mp4")
temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4")
temp_output = output_path.replace(".mp4", "_compressed.mp4")
try:
logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}")
video_url_l = video_url.lower()
if ".m3u8" in video_url_l:
logging.info("📺 Using HLS ffmpeg mode")
download_cmd = [
"ffmpeg",
"-y",
"-protocol_whitelist", "file,http,https,tcp,tls,crypto",
"-allowed_extensions", "ALL",
"-i", video_url,
"-c", "copy",
temp_input,
]
else:
logging.info("🎥 Using direct MP4 ffmpeg mode")
download_cmd = [
"ffmpeg",
"-y",
"-i", video_url,
"-c", "copy",
temp_input,
]
download_result = subprocess.run(download_cmd, capture_output=True, text=True)
if download_result.returncode != 0:
logging.error(f"❌ ffmpeg download failed:\n{download_result.stderr}")
return None
if not os.path.exists(temp_input) or os.path.getsize(temp_input) == 0:
logging.error("❌ Downloaded video source file is missing or empty.")
return None
logging.info(f"✅ Video downloaded: {temp_input}")
video_clip = VideoFileClip(temp_input)
duration = float(video_clip.duration) if video_clip.duration else 0
if duration <= 0:
video_clip.close()
logging.error("❌ Downloaded video has invalid or unknown duration.")
return None
end_time = min(VIDEO_MAX_DURATION_SECONDS, duration)
if hasattr(video_clip, "subclipped"):
cropped_clip = video_clip.subclipped(0, end_time)
else:
cropped_clip = video_clip.subclip(0, end_time)
cropped_clip.write_videofile(
temp_trimmed,
codec="libx264",
audio_codec="aac",
preset="veryfast",
bitrate="1800k",
audio_bitrate="128k",
logger=None
)
video_clip.close()
cropped_clip.close()
if not os.path.exists(temp_trimmed) or os.path.getsize(temp_trimmed) == 0:
logging.error("❌ Trimmed video output is missing or empty.")
return None
trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024)
logging.info(f"📦 Trimmed video size before compression: {trimmed_size_mb:.2f} MB")
compress_cmd = [
"ffmpeg",
"-y",
"-i", temp_trimmed,
"-vf", "scale='min(720,iw)':-2",
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "30",
"-maxrate", "1800k",
"-bufsize", "3600k",
"-c:a", "aac",
"-b:a", "128k",
"-movflags", "+faststart",
temp_output,
]
compress_result = subprocess.run(compress_cmd, capture_output=True, text=True)
if compress_result.returncode != 0:
logging.error(f"❌ ffmpeg compression failed:\n{compress_result.stderr}")
return None
if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0:
logging.error("❌ Compressed video output is missing or empty.")
return None
final_size_mb = os.path.getsize(temp_output) / (1024 * 1024)
logging.info(f"✅ Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)")
os.replace(temp_output, output_path)
logging.info(f"✅ Final video ready: {output_path}")
return output_path
except Exception as e:
logging.error(f"❌ Error processing video: {repr(e)}")
return None
finally:
for path in [temp_input, temp_trimmed, temp_output]:
if os.path.exists(path):
try:
os.remove(path)
except Exception:
pass
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
candidate_non_x_urls = candidate["canonical_non_x_urls"]
candidate_text_media_key = candidate["text_media_key"]
candidate_normalized_text = candidate["normalized_text"]
for existing in recent_bsky_posts:
existing_non_x_urls = existing["canonical_non_x_urls"]
if (
candidate_non_x_urls and
candidate_non_x_urls == existing_non_x_urls and
candidate_normalized_text == existing["normalized_text"]
):
return True, "bsky:normalized_text_plus_non_x_urls"
if candidate_text_media_key == existing["text_media_key"]:
return True, "bsky:text_media_fingerprint"
if candidate_normalized_text == existing["normalized_text"]:
return True, "bsky:normalized_text"
return False, None
def sync_feeds(args):
logging.info("🔄 Starting sync cycle...")
try:
state = load_state(STATE_PATH)
tweets = scrape_tweets_via_playwright(
args.twitter_username,
args.twitter_password,
args.twitter_email,
args.twitter_handle
)
if not tweets:
logging.warning("⚠️ No tweets found or failed to fetch. Skipping Bluesky sync for this cycle.")
return
bsky_client = create_bsky_client(
args.bsky_base_url,
args.bsky_handle,
args.bsky_password
)
recent_bsky_posts = get_recent_bsky_posts(
bsky_client,
args.bsky_handle,
limit=DEDUPE_BSKY_LIMIT
)
logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.")
logging.info(f"🧠 Local state currently tracks {len(state.get('posted_tweets', {}))} posted items.")
too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS)
logging.info(f"🕒 Will ignore tweets older than: {too_old_cutoff}")
candidate_tweets = []
for tweet in reversed(tweets):
try:
tweet_time = arrow.get(tweet.created_on)
if tweet_time < too_old_cutoff:
logging.info(f"⏭️ Skipping old tweet from {tweet_time}")
continue
full_clean_text = clean_post_text(tweet.text)
normalized_text = normalize_post_text(full_clean_text)
if not normalized_text:
logging.info(f"⏭️ Skipping empty/blank tweet from {tweet_time}")
continue
ordered_non_x_urls = extract_ordered_non_x_urls(full_clean_text)
canonical_non_x_urls = set(ordered_non_x_urls)
primary_non_x_url = ordered_non_x_urls[0] if ordered_non_x_urls else None
raw_text = prepare_post_text_for_bsky(full_clean_text, keep_url=primary_non_x_url)
media_fingerprint = build_media_fingerprint(tweet)
text_media_key = build_text_media_key(normalized_text, media_fingerprint)
has_video = any(getattr(m, "type", None) == "video" for m in (tweet.media or []))
has_photo = any(getattr(m, "type", None) == "photo" for m in (tweet.media or []))
candidate_tweets.append({
"tweet": tweet,
"tweet_time": tweet_time,
"raw_text": raw_text,
"full_clean_text": full_clean_text,
"normalized_text": normalized_text,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"canonical_tweet_url": canonicalize_tweet_url(tweet.tweet_url),
"canonical_non_x_urls": canonical_non_x_urls,
"ordered_non_x_urls": ordered_non_x_urls,
"primary_non_x_url": primary_non_x_url,
"looks_like_title_plus_url": looks_like_title_plus_url_post(full_clean_text),
"has_video": has_video,
"has_photo": has_photo,
})
except Exception as e:
logging.warning(f"⚠️ Failed to prepare candidate tweet: {e}")
logging.info(f"🧪 Prepared {len(candidate_tweets)} candidate tweets for duplicate comparison.")
tweets_to_post = []
for candidate in candidate_tweets:
is_dup_state, reason_state = candidate_matches_state(candidate, state)
if is_dup_state:
logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}")
continue
is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts)
if is_dup_bsky:
logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}")
continue
tweets_to_post.append(candidate)
logging.info(f"📬 {len(tweets_to_post)} tweets remain after duplicate filtering.")
if not tweets_to_post:
logging.info("✅ No new tweets need posting after duplicate comparison.")
return
new_posts = 0
browser_state_file = "twitter_browser_state.json"
with sync_playwright() as p, httpx.Client() as media_http_client:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"]
)
context_kwargs = {
"user_agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
"viewport": {"width": 1920, "height": 1080},
}
if os.path.exists(browser_state_file):
context_kwargs["storage_state"] = browser_state_file
context = browser.new_context(**context_kwargs)
for candidate in tweets_to_post:
tweet = candidate["tweet"]
tweet_time = candidate["tweet_time"]
raw_text = candidate["raw_text"]
full_clean_text = candidate["full_clean_text"]
logging.info(f"📝 Posting missing tweet from {tweet_time} to Bluesky...")
rich_text = make_rich(raw_text)
dynamic_alt = build_dynamic_alt(full_clean_text)
image_embeds = []
video_embed = None
external_embed = None
media_upload_failures = []
has_video = candidate.get("has_video", False)
if has_video:
video_media = next((m for m in (tweet.media or []) if getattr(m, "type", None) == "video"), None)
if video_media:
if not tweet.tweet_url:
logging.warning("⚠️ Tweet has video marker but no tweet URL. Skipping video.")
media_upload_failures.append("video:no_tweet_url")
else:
temp_video_path = "temp_video.mp4"
try:
real_video_url = extract_video_url_from_tweet_page(context, tweet.tweet_url)
if not real_video_url:
logging.warning(f"⚠️ Could not resolve playable video URL for {tweet.tweet_url}")
media_upload_failures.append(f"video:resolve_failed:{tweet.tweet_url}")
else:
cropped_video_path = download_and_crop_video(real_video_url, temp_video_path)
if not cropped_video_path:
logging.warning(f"⚠️ Video download/crop failed for {tweet.tweet_url}")
media_upload_failures.append(f"video:crop_failed:{tweet.tweet_url}")
else:
video_blob = get_blob_from_file(cropped_video_path, bsky_client)
if not video_blob:
logging.warning(f"⚠️ Video upload blob failed for {tweet.tweet_url}")
media_upload_failures.append(f"video:upload_failed:{tweet.tweet_url}")
else:
video_embed = build_video_embed(video_blob, dynamic_alt)
if not video_embed:
media_upload_failures.append(f"video:embed_failed:{tweet.tweet_url}")
finally:
if os.path.exists(temp_video_path):
os.remove(temp_video_path)
if not video_embed:
logging.warning(
"⚠️ Tweet contains video, but video could not be posted. "
"Skipping photo fallback for this tweet."
)
else:
if tweet.media:
for media in tweet.media:
if media.type == "photo":
blob = get_blob_from_url(media.media_url_https, bsky_client, media_http_client)
if blob:
image_embeds.append(
models.AppBskyEmbedImages.Image(
alt=dynamic_alt,
image=blob
)
)
else:
media_upload_failures.append(f"photo:{media.media_url_https}")
if not video_embed and not image_embeds:
candidate_url = candidate.get("primary_non_x_url")
if candidate_url:
if candidate.get("looks_like_title_plus_url"):
logging.info(f"🔗 Detected title+URL post style. Using URL for external card: {candidate_url}")
else:
logging.info(f"🔗 Using first non-X URL for external card: {candidate_url}")
external_embed = build_external_link_embed(
candidate_url,
bsky_client,
media_http_client,
fallback_title="Link"
)
if external_embed:
logging.info(f"✅ Built external link card for URL: {candidate_url}")
else:
logging.info(f" Could not build external link card metadata for URL: {candidate_url}")
try:
post_result = None
post_mode = "text"
if video_embed:
post_result = bsky_client.send_post(text=rich_text, embed=video_embed, langs=["ca"])
post_mode = "video"
elif image_embeds:
embed = models.AppBskyEmbedImages.Main(images=image_embeds)
post_result = bsky_client.send_post(text=rich_text, embed=embed, langs=["ca"])
post_mode = f"images:{len(image_embeds)}"
elif external_embed:
post_result = bsky_client.send_post(text=rich_text, embed=external_embed, langs=["ca"])
post_mode = "external_link_card"
else:
post_result = bsky_client.send_post(text=rich_text, langs=["ca"])
post_mode = "text_only"
bsky_uri = getattr(post_result, "uri", None)
remember_posted_tweet(state, candidate, bsky_uri=bsky_uri)
state = prune_state(state, max_entries=5000)
save_state(state, STATE_PATH)
recent_bsky_posts.insert(0, {
"uri": bsky_uri,
"text": raw_text,
"normalized_text": candidate["normalized_text"],
"canonical_non_x_urls": candidate["canonical_non_x_urls"],
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"created_at": arrow.utcnow().isoformat(),
})
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
new_posts += 1
if media_upload_failures:
logging.warning(
f"✅ Posted tweet to Bluesky with degraded media mode ({post_mode}). "
f"Failed media items: {media_upload_failures}"
)
else:
logging.info(f"✅ Posted new tweet to Bluesky with mode {post_mode}: {raw_text}")
time.sleep(5)
except Exception as e:
logging.error(f"❌ Failed to post tweet to Bluesky: {e}")
browser.close()
logging.info(f"✅ Sync complete. Posted {new_posts} new updates.")
except Exception as e:
logging.error(f"❌ Error during sync cycle: {e}")
def main():
load_dotenv()
parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync")
parser.add_argument("--twitter-username", help="Your Twitter login username")
parser.add_argument("--twitter-password", help="Your Twitter login password")
parser.add_argument("--twitter-email", help="Your Twitter email for security challenges")
parser.add_argument("--twitter-handle", help="The Twitter account to scrape")
parser.add_argument("--bsky-handle", help="Your Bluesky handle")
parser.add_argument("--bsky-password", help="Your Bluesky app password")
parser.add_argument("--bsky-base-url", help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social")
args = parser.parse_args()
args.twitter_username = args.twitter_username or os.getenv("TWITTER_USERNAME")
args.twitter_password = args.twitter_password or os.getenv("TWITTER_PASSWORD")
args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
args.twitter_handle = args.twitter_handle or os.getenv("TWITTER_HANDLE") or args.twitter_username
args.bsky_base_url = args.bsky_base_url if args.bsky_base_url else DEFAULT_BSKY_BASE_URL
missing_args = []
if not args.twitter_username:
missing_args.append("--twitter-username")
if not args.twitter_password:
missing_args.append("--twitter-password")
if not args.bsky_handle:
missing_args.append("--bsky-handle")
if not args.bsky_password:
missing_args.append("--bsky-password")
if missing_args:
logging.error(f"❌ Missing credentials! You forgot to provide: {', '.join(missing_args)}")
return
logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}")
logging.info(f"🌍 Posting destination base URL: {args.bsky_base_url}")
sync_feeds(args)
logging.info("🤖 Bot finished.")
if __name__ == "__main__":
main()