Files
post2bsky/twitter2bsky_daemon.py
2026-04-13 19:17:21 +00:00

3623 lines
121 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import arrow
import hashlib
import html
import io
import json
import logging
import re
import httpx
import time
import os
import subprocess
import uuid
from urllib.parse import urlparse
from dotenv import load_dotenv
from atproto import Client, client_utils, models
from playwright.sync_api import sync_playwright
from moviepy import VideoFileClip
from bs4 import BeautifulSoup
from PIL import Image
# --- Configuration ---
LOG_PATH = "twitter2bsky.log"
STATE_PATH = "twitter2bsky_state.json"
SCRAPE_TWEET_LIMIT = 30
DEDUPE_BSKY_LIMIT = 30
TWEET_MAX_AGE_DAYS = 3
BSKY_TEXT_MAX_LENGTH = 275
DEFAULT_BSKY_LANGS = ["ca"]
VIDEO_MAX_DURATION_SECONDS = 179
MAX_VIDEO_UPLOAD_SIZE_MB = 45
BSKY_IMAGE_MAX_BYTES = 950 * 1024
BSKY_IMAGE_MAX_DIMENSION = 2000
BSKY_IMAGE_MIN_JPEG_QUALITY = 45
EXTERNAL_THUMB_MAX_BYTES = 950 * 1024
EXTERNAL_THUMB_MAX_DIMENSION = 1200
EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
BSKY_SEND_POST_MAX_RETRIES = 3
BSKY_SEND_POST_BASE_DELAY = 5
BSKY_SEND_POST_MAX_DELAY = 60
MEDIA_DOWNLOAD_TIMEOUT = 30
LINK_METADATA_TIMEOUT = 10
URL_RESOLVE_TIMEOUT = 12
PLAYWRIGHT_RESOLVE_TIMEOUT_MS = 30000
SUBPROCESS_TIMEOUT_SECONDS = 180
FFPROBE_TIMEOUT_SECONDS = 15 # FIX #6 — named constant for ffprobe probe timeout
DEFAULT_BSKY_BASE_URL = "https://bsky.social"
# FIX #11 — named constants replacing magic numbers scattered across the codebase
OG_TITLE_WAIT_TIMEOUT_MS = 7000 # ms to wait for og:title meta tag
PLAYWRIGHT_POST_GOTO_SLEEP_S = 2.0 # seconds to sleep after page.goto in resolvers
PLAYWRIGHT_IDLE_POLL_SLEEP_S = 0.8 # seconds between idle-state polls
PLAYWRIGHT_IDLE_POLL_ROUNDS = 4 # number of idle-state poll rounds
PLAYWRIGHT_RETRY_SLEEP_S = 2.0 # seconds to sleep before retry interaction
VIDEO_PLAYER_WAIT_ROUNDS = 8 # rounds waiting for video URL after first click
VIDEO_PLAYER_RETRY_ROUNDS = 5 # rounds waiting for video URL after retry click
URL_TAIL_MIN_PREFIX_CHARS = 35 # minimum prefix chars before URL for tail detection
URL_TAIL_MAX_LOOKBACK_CHARS = 120 # generous lookback window when hashtags follow URL
URL_TAIL_MAX_CLAUSE_DISTANCE = 180 # max chars a clause boundary may be from URL start
DYNAMIC_ALT_MAX_LENGTH = 150 # max chars for dynamic alt text
TRUNCATE_MIN_PREFIX_CHARS = 20 # min prefix length before inserting ellipsis
SHORT_TWEET_OG_FETCH_THRESHOLD = 35 # tweets shorter than this get og:title enrichment
ORPHAN_DIGIT_MAX_DIGITS = 3 # max digit count for orphaned-digit-line detection
SESSION_FILE_PERMISSIONS = 0o600 # FIX #14 — restrictive permissions for session cookie file
# --- Logging Setup ---
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.FileHandler(LOG_PATH, encoding="utf-8"), logging.StreamHandler()],
level=logging.INFO,
)
# --- Per-run caches for efficiency ---
# FIX #12 — caches are still module-level but now encapsulated in a class so they
# can be passed explicitly and are safe to reset between daemon cycles without
# relying on global mutation from arbitrary call sites.
class _RunCache:
def __init__(self):
self.og_title: dict = {}
self.url_resolution: dict = {}
self.url_validity: dict = {}
def clear(self):
self.og_title.clear()
self.url_resolution.clear()
self.url_validity.clear()
_cache = _RunCache()
def reset_caches():
_cache.clear()
# --- Custom Classes ---
class ScrapedMedia:
def __init__(self, url, media_type="photo"):
self.type = media_type
self.media_url_https = url
class ScrapedTweet:
def __init__(self, created_on, text, media_urls, tweet_url=None, card_url=None, is_retweet=False):
self.created_on = created_on
self.text = text
self.tweet_url = tweet_url
self.card_url = card_url
self.is_retweet = is_retweet
self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls]
# --- Helpers ---
def take_error_screenshot(page, error_msg):
logging.info(f"📸 Taking screenshot... Shot: {error_msg}")
timestamp = time.strftime("%Y%m%d_%H%M%S")
screenshot_name = f"screenshot_{timestamp}.png"
page.screenshot(path=screenshot_name)
logging.info(f"📸 Screenshot saved as: {screenshot_name}")
def is_valid_url(url):
if url in _cache.url_validity:
return _cache.url_validity[url]
try:
response = httpx.head(url, timeout=5, follow_redirects=True)
result = response.status_code < 500
except Exception:
result = False
_cache.url_validity[url] = result
return result
def strip_trailing_url_punctuation(url):
if not url:
return url
return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url.strip())
def split_concatenated_urls(text):
if not text:
return text
fixed = re.sub(r"(https?://[^\s]+?)(https?://)", r"\1 \2", text)
if fixed != text:
logging.info("🔧 Split concatenated URLs in text")
return fixed
def repair_broken_urls(text):
if not text:
return text
original = text
text = split_concatenated_urls(text)
text = re.sub(r"(https?://)\s*[\r\n]+\s*", r"\1", text, flags=re.IGNORECASE)
prev_text = None
while prev_text != text:
prev_text = text
text = re.sub(
r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)",
r"\1\2",
text,
flags=re.IGNORECASE,
)
text = re.sub(
r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)",
r"\1\2",
text,
flags=re.IGNORECASE,
)
text = split_concatenated_urls(text)
if text != original:
logging.info("🔧 Repaired broken URL wrapping in scraped text")
return text
def repair_broken_mentions(text):
if not text:
return text
lines = text.splitlines()
result = []
i = 0
changed = False
def is_mention_only_line(s):
return bool(re.fullmatch(r"@[A-Za-z0-9_]+", s.strip()))
def is_blank_line(s):
return not s.strip()
while i < len(lines):
current = lines[i]
stripped = current.strip()
if is_blank_line(current):
result.append("")
i += 1
continue
if is_mention_only_line(current):
if result and result[-1].strip():
result[-1] = result[-1].rstrip() + " " + stripped
changed = True
else:
result.append(stripped)
i += 1
while i < len(lines):
next_line = lines[i]
next_stripped = next_line.strip()
if is_blank_line(next_line):
break
if is_mention_only_line(next_line):
break
result[-1] = result[-1].rstrip() + " " + next_stripped
changed = True
i += 1
if i < len(lines) and is_blank_line(lines[i]):
break
continue
if i + 1 < len(lines) and is_mention_only_line(lines[i + 1]):
merged = stripped + " " + lines[i + 1].strip()
changed = True
i += 2
while i < len(lines):
next_line = lines[i]
next_stripped = next_line.strip()
if is_blank_line(next_line):
break
if is_mention_only_line(next_line):
break
merged = merged.rstrip() + " " + next_stripped
changed = True
i += 1
if i < len(lines) and is_blank_line(lines[i]):
break
result.append(merged)
continue
result.append(stripped)
i += 1
new_text = "\n".join(result)
if changed:
logging.info("🔧 Repaired broken mention wrapping in scraped text")
return new_text
def strip_line_edge_whitespace(text):
if not text:
return text
lines = text.splitlines()
cleaned_lines = []
changed = False
for line in lines:
cleaned = line.strip()
if cleaned != line:
changed = True
cleaned_lines.append(cleaned)
new_text = "\n".join(cleaned_lines)
if changed:
logging.info("🔧 Stripped leading/trailing whitespace from scraped text lines")
return new_text
def remove_trailing_ellipsis_line(text):
if not text:
return text
lines = text.splitlines()
while lines and lines[-1].strip() in {"...", ""}:
lines.pop()
return "\n".join(lines).strip()
def remove_orphaned_digit_lines_before_hashtags(text):
"""
Remove lines that contain only a number (e.g. '5') when they appear
immediately before a line starting with a hashtag. These are typically
scraped UI artifacts (image counts, engagement badges, etc.).
"""
if not text:
return text
lines = text.splitlines()
if len(lines) < 2:
return text
result = []
changed = False
i = 0
# FIX #11 — use named constant ORPHAN_DIGIT_MAX_DIGITS instead of literal 3
orphan_pattern = re.compile(rf"\d{{1,{ORPHAN_DIGIT_MAX_DIGITS}}}")
while i < len(lines):
stripped = lines[i].strip()
if (
stripped
and orphan_pattern.fullmatch(stripped)
and i + 1 < len(lines)
and lines[i + 1].strip().startswith("#")
):
logging.info(f"🔧 Removing orphaned digit line '{stripped}' before hashtag line")
changed = True
i += 1
continue
result.append(lines[i])
i += 1
if changed:
return "\n".join(result)
return text
def clean_post_text(text):
raw_text = (text or "").strip()
raw_text = repair_broken_urls(raw_text)
raw_text = repair_broken_mentions(raw_text)
raw_text = strip_line_edge_whitespace(raw_text)
raw_text = remove_trailing_ellipsis_line(raw_text)
raw_text = remove_orphaned_digit_lines_before_hashtags(raw_text)
return raw_text.strip()
def clean_url(url):
trimmed_url = url.strip()
cleaned_url = re.sub(r"\s+", "", trimmed_url)
cleaned_url = strip_trailing_url_punctuation(cleaned_url)
if is_valid_url(cleaned_url):
return cleaned_url
return None
def canonicalize_url(url):
if not url:
return None
return strip_trailing_url_punctuation(url.strip())
def normalize_urlish_token(token):
if not token:
return None
token = strip_trailing_url_punctuation(token.strip())
if not token:
return None
if token.startswith(("http://", "https://")):
return token
if token.startswith("www."):
return f"https://{token}"
return None
def canonicalize_tweet_url(url):
if not url:
return None
url = url.strip()
match = re.search(
r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)",
url,
re.IGNORECASE,
)
if not match:
return url.lower()
handle = match.group(1).lower()
tweet_id = match.group(2)
return f"https://x.com/{handle}/status/{tweet_id}"
def extract_tweet_id(tweet_url):
if not tweet_url:
return None
match = re.search(r"/status/(\d+)", tweet_url)
if match:
return match.group(1)
return None
def make_unique_video_temp_base(tweet_url=None):
tweet_id = extract_tweet_id(tweet_url) or "unknown"
ts_ms = int(time.time() * 1000)
rand = uuid.uuid4().hex[:8]
base = f"temp_video_{tweet_id}_{ts_ms}_{rand}"
logging.info(f"🎞️ Using unique temp video base: {base}")
return base
def remove_file_quietly(path):
if path and os.path.exists(path):
try:
os.remove(path)
logging.info(f"🧹 Removed temp file: {path}")
except Exception as e:
logging.warning(f"⚠️ Could not remove temp file {path}: {e}")
def is_x_or_twitter_domain(url):
try:
normalized = normalize_urlish_token(url) or url
hostname = (urlparse(normalized).hostname or "").lower()
return hostname in {
"x.com",
"www.x.com",
"twitter.com",
"www.twitter.com",
"mobile.twitter.com",
}
except Exception:
return False
def is_tco_domain(url):
try:
normalized = normalize_urlish_token(url) or url
hostname = (urlparse(normalized).hostname or "").lower()
return hostname == "t.co"
except Exception:
return False
def is_external_non_x_url(url):
if not url:
return False
return (not is_tco_domain(url)) and (not is_x_or_twitter_domain(url))
def extract_urls_from_text(text):
if not text:
return []
repaired = repair_broken_urls(text)
pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+'
return re.findall(pattern, repaired)
def extract_quoted_text_from_og_title(og_title):
if not og_title:
return None
decoded = html.unescape(og_title).strip()
match = re.search(r'on X:\s*"(?P<text>.*)"\s*/\s*X\s*$', decoded, flags=re.DOTALL)
if match:
extracted = match.group("text").strip()
if extracted:
return extracted
first_quote = decoded.find('"')
last_quote = decoded.rfind('"')
if 0 <= first_quote < last_quote:
extracted = decoded[first_quote + 1 : last_quote].strip()
if extracted:
return extracted
return None
def should_fetch_og_title(tweet):
text = clean_post_text(tweet.text or "")
urls = extract_urls_from_text(text)
if not text:
return True
if any(is_tco_domain(normalize_urlish_token(u) or u) for u in urls):
return True
if "" in text or text.endswith("..."):
return True
# FIX #11 — use named constant SHORT_TWEET_OG_FETCH_THRESHOLD instead of literal 35
if len(text) < SHORT_TWEET_OG_FETCH_THRESHOLD:
return True
return False
def fetch_tweet_og_title_text(tweet_url):
if not tweet_url:
return None
if tweet_url in _cache.og_title:
logging.info(f"⚡ Using cached og:title text for {tweet_url}")
return _cache.og_title[tweet_url]
browser = None
browser_context = None # FIX #1 — renamed from 'context' to avoid collision
page = None
try:
logging.info(f"🧾 Fetching og:title from tweet page: {tweet_url}")
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"],
)
browser_context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
viewport={"width": 1280, "height": 900},
)
page = browser_context.new_page()
page.goto(
tweet_url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS,
)
try:
# FIX #11 — use named constant OG_TITLE_WAIT_TIMEOUT_MS instead of literal 7000
page.wait_for_selector('meta[property="og:title"]', timeout=OG_TITLE_WAIT_TIMEOUT_MS)
except Exception:
pass
og_title = (
page.locator('meta[property="og:title"]')
.first.get_attribute("content")
)
extracted = extract_quoted_text_from_og_title(og_title)
if extracted:
extracted = clean_post_text(extracted)
_cache.og_title[tweet_url] = extracted
logging.info(f"✅ Extracted tweet text from og:title for {tweet_url}")
return extracted
logging.info(f" No usable og:title text extracted for {tweet_url}")
_cache.og_title[tweet_url] = None
return None
except Exception as e:
logging.warning(
f"⚠️ Could not extract og:title text from {tweet_url}: {repr(e)}"
)
try:
if page:
take_error_screenshot(page, "tweet_og_title_failed")
except Exception:
pass
_cache.og_title[tweet_url] = None
return None
finally:
try:
if page:
page.close()
except Exception:
pass
try:
if browser_context:
browser_context.close()
except Exception:
pass
try:
if browser:
browser.close()
except Exception:
pass
def resolve_tco_with_httpx(url, http_client):
try:
response = http_client.get(
url, timeout=URL_RESOLVE_TIMEOUT, follow_redirects=True
)
final_url = canonicalize_url(str(response.url))
if final_url:
logging.info(f"🔗 Resolved t.co with httpx: {url} -> {final_url}")
return final_url
except Exception as e:
logging.warning(f"⚠️ httpx t.co resolution failed for {url}: {repr(e)}")
return canonicalize_url(url)
def resolve_tco_with_playwright(url):
browser = None
browser_context = None # FIX #1 — renamed from 'context'
page = None
try:
logging.info(f"🌐 Resolving t.co with Playwright: {url}")
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"],
)
browser_context = browser.new_context(
user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
viewport={"width": 1280, "height": 900},
)
page = browser_context.new_page()
try:
page.goto(
url,
wait_until="domcontentloaded",
timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS,
)
except Exception as e:
logging.warning(
f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}"
)
# FIX #11 — use named constant PLAYWRIGHT_POST_GOTO_SLEEP_S
time.sleep(PLAYWRIGHT_POST_GOTO_SLEEP_S)
final_url = canonicalize_url(page.url)
# FIX #11 — use named constants for poll rounds and sleep
for _ in range(PLAYWRIGHT_IDLE_POLL_ROUNDS):
if final_url and is_external_non_x_url(final_url):
break
try:
page.wait_for_load_state("networkidle", timeout=2000)
except Exception:
pass
time.sleep(PLAYWRIGHT_IDLE_POLL_SLEEP_S)
final_url = canonicalize_url(page.url)
logging.info(f"🌐 Playwright final URL for {url}: {final_url}")
return final_url
except Exception as e:
logging.warning(
f"⚠️ Playwright t.co resolution failed for {url}: {repr(e)}"
)
try:
if page:
take_error_screenshot(page, "tco_resolve_failed")
except Exception:
pass
finally:
try:
if page:
page.close()
except Exception:
pass
try:
if browser_context:
browser_context.close()
except Exception:
pass
try:
if browser:
browser.close()
except Exception:
pass
return canonicalize_url(url)
def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True):
if not url:
return None
normalized = normalize_urlish_token(url) or url
cleaned = canonicalize_url(normalized)
if not cleaned:
return None
if cleaned in _cache.url_resolution:
logging.info(
f"⚡ Using cached URL resolution: {cleaned} -> {_cache.url_resolution[cleaned]}"
)
return _cache.url_resolution[cleaned]
if not is_tco_domain(cleaned):
_cache.url_resolution[cleaned] = cleaned
return cleaned
resolved_http = resolve_tco_with_httpx(cleaned, http_client)
if is_external_non_x_url(resolved_http):
_cache.url_resolution[cleaned] = resolved_http
return resolved_http
if not allow_playwright_fallback:
_cache.url_resolution[cleaned] = resolved_http
return resolved_http
resolved_browser = resolve_tco_with_playwright(cleaned)
if is_external_non_x_url(resolved_browser):
logging.info(
f"✅ Resolved t.co via Playwright to external URL: {resolved_browser}"
)
_cache.url_resolution[cleaned] = resolved_browser
return resolved_browser
if resolved_http and not is_tco_domain(resolved_http):
_cache.url_resolution[cleaned] = resolved_http
return resolved_http
_cache.url_resolution[cleaned] = cleaned
return cleaned
def extract_non_x_urls_from_text(text):
urls = extract_urls_from_text(text)
result = []
for url in urls:
normalized = normalize_urlish_token(url)
cleaned = strip_trailing_url_punctuation(normalized or url)
if not cleaned:
continue
if is_tco_domain(cleaned):
result.append(cleaned)
continue
if not is_x_or_twitter_domain(cleaned):
result.append(cleaned)
return result
def extract_ordered_non_x_urls(text):
seen = set()
ordered = []
for url in extract_non_x_urls_from_text(text):
canonical = canonicalize_url(url)
if canonical and canonical not in seen:
seen.add(canonical)
ordered.append(canonical)
return ordered
def extract_first_visible_non_x_url(text):
for url in extract_non_x_urls_from_text(text or ""):
canonical = canonicalize_url(url)
if canonical:
return canonical
return None
def extract_first_resolved_external_url(
text, http_client, allow_playwright_fallback=True
):
for url in extract_non_x_urls_from_text(text or ""):
resolved = resolve_url_if_needed(
url,
http_client,
allow_playwright_fallback=allow_playwright_fallback,
)
if not resolved:
continue
if is_external_non_x_url(resolved):
logging.info(f"✅ Selected resolved external URL for card: {resolved}")
return resolved
return None
def resolve_card_url(card_url, http_client):
"""
Resolve a card URL (typically t.co) scraped from the tweet's link preview card.
Returns the final external URL or None.
"""
if not card_url:
return None
cleaned = canonicalize_url(card_url.strip())
if not cleaned:
return None
if is_external_non_x_url(cleaned):
logging.info(f"🔗 Card URL is already external: {cleaned}")
return cleaned
if is_tco_domain(cleaned):
resolved = resolve_url_if_needed(
cleaned, http_client, allow_playwright_fallback=True
)
if resolved and is_external_non_x_url(resolved):
logging.info(f"🔗 Resolved card t.co URL: {cleaned} -> {resolved}")
return resolved
if is_x_or_twitter_domain(cleaned):
logging.info(
f" Card URL resolves to X/Twitter domain, ignoring: {cleaned}"
)
return None
return cleaned
def sanitize_visible_urls_in_text(text, http_client, has_media=False):
if not text:
return text, None
working = clean_post_text(text)
url_pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+'
urls = re.findall(url_pattern, working)
if not urls:
return working, None
replacements = {}
first_external_resolved = None
for raw_url in urls:
normalized = normalize_urlish_token(raw_url)
cleaned = canonicalize_url(normalized or raw_url)
if not cleaned:
continue
if is_x_or_twitter_domain(cleaned):
replacements[raw_url] = ""
logging.info(
f"🧹 Removing X/Twitter URL from visible text: {cleaned}"
)
continue
final_url = cleaned
if is_tco_domain(cleaned):
resolved_http_first = resolve_tco_with_httpx(cleaned, http_client)
if is_external_non_x_url(resolved_http_first):
final_url = resolved_http_first
_cache.url_resolution[cleaned] = final_url
else:
if (
has_media
and resolved_http_first
and is_x_or_twitter_domain(resolved_http_first)
):
final_url = resolved_http_first
_cache.url_resolution[cleaned] = final_url
logging.info(
f"⚡ Skipping Playwright t.co fallback because tweet has media "
f"and httpx already resolved to X/Twitter URL: {final_url}"
)
else:
final_url = resolve_url_if_needed(
cleaned, http_client, allow_playwright_fallback=True
)
if is_x_or_twitter_domain(final_url):
replacements[raw_url] = ""
logging.info(
f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}"
)
continue
if normalized and normalized.startswith("https://www."):
final_url = normalized
elif normalized and normalized.startswith("http://www."):
final_url = normalized
if is_external_non_x_url(final_url) and not first_external_resolved:
first_external_resolved = final_url
replacements[raw_url] = final_url
def replace_match(match):
raw = match.group(0)
return replacements.get(raw, raw)
working = re.sub(url_pattern, replace_match, working)
deduped_lines = []
for line in working.splitlines():
line_urls = re.findall(url_pattern, line)
if len(line_urls) > 1:
prefix = re.sub(url_pattern, "", line).strip()
kept_urls = []
# FIX #4 — local set per line, not shared outer state
seen_in_line: set = set()
for url in line_urls:
normalized = normalize_urlish_token(url) or url
canonical = canonicalize_url(normalized)
if not canonical:
continue
if is_x_or_twitter_domain(canonical):
continue
if canonical in seen_in_line:
continue
seen_in_line.add(canonical)
kept_urls.append(url)
if prefix and kept_urls:
rebuilt = prefix + " " + " ".join(kept_urls)
elif prefix:
rebuilt = prefix
else:
rebuilt = " ".join(kept_urls)
deduped_lines.append(rebuilt.strip())
else:
cleaned_line = re.sub(r"\s{2,}", " ", line).strip()
deduped_lines.append(cleaned_line)
working = "\n".join(deduped_lines)
working = re.sub(r"[ \t]+", " ", working)
working = re.sub(r"\n{3,}", "\n\n", working).strip()
return working, first_external_resolved
def build_effective_tweet_text(tweet, http_client):
scraped_text = clean_post_text(tweet.text or "")
has_media = bool(tweet.media)
og_title_text = None
if should_fetch_og_title(tweet):
og_title_text = fetch_tweet_og_title_text(tweet.tweet_url)
candidate_text = scraped_text
if og_title_text:
scraped_urls = extract_urls_from_text(scraped_text)
og_urls = extract_urls_from_text(og_title_text)
if len(og_title_text) >= len(scraped_text) or (
og_urls and not scraped_urls
):
candidate_text = og_title_text
logging.info("🧾 Using og:title-derived tweet text as primary content")
candidate_text, resolved_primary_external_url = sanitize_visible_urls_in_text(
candidate_text,
http_client,
has_media=has_media,
)
candidate_text = clean_post_text(candidate_text)
# --- Resolve the card_url scraped from the tweet's link preview ---
resolved_card_url = resolve_card_url(
getattr(tweet, "card_url", None), http_client
)
if resolved_card_url and is_external_non_x_url(resolved_card_url):
if not resolved_primary_external_url:
resolved_primary_external_url = resolved_card_url
logging.info(
f"🔗 Using resolved card URL as primary external URL: {resolved_card_url}"
)
elif resolved_primary_external_url != resolved_card_url:
logging.info(
f" Card URL ({resolved_card_url}) differs from text URL "
f"({resolved_primary_external_url}). Preferring card URL for external embed."
)
resolved_primary_external_url = resolved_card_url
if not resolved_primary_external_url:
resolved_primary_external_url = extract_first_resolved_external_url(
candidate_text,
http_client,
allow_playwright_fallback=not has_media,
)
return candidate_text, resolved_primary_external_url
def remove_url_from_visible_text(text, url_to_remove):
if not text or not url_to_remove:
return text
canonical_target = canonicalize_url(url_to_remove)
lines = text.splitlines()
cleaned_lines = []
for line in lines:
line_urls = extract_urls_from_text(line)
new_line = line
for url in line_urls:
normalized = normalize_urlish_token(url) or url
cleaned_candidate = canonicalize_url(
strip_trailing_url_punctuation(normalized)
)
if cleaned_candidate == canonical_target:
pattern = re.escape(url)
new_line = re.sub(pattern, "", new_line)
new_line = re.sub(r"[ \t]+", " ", new_line).strip()
cleaned_lines.append(new_line)
result = "\n".join(cleaned_lines)
result = re.sub(r"[ \t]+", " ", result)
result = re.sub(r"\n{3,}", "\n\n", result).strip()
return result
def looks_like_title_plus_url_post(text):
if not text:
return False
repaired = repair_broken_urls(text)
repaired = strip_line_edge_whitespace(repaired)
lines = [line.strip() for line in repaired.splitlines() if line.strip()]
if len(lines) < 2:
return False
last_line = lines[-1]
urls_in_last_line = extract_ordered_non_x_urls(last_line)
total_urls = extract_ordered_non_x_urls(repaired)
return (
len(urls_in_last_line) == 1
and len(total_urls) == 1
and last_line.startswith(("http://", "https://", "www."))
)
def looks_like_url_and_tag_tail(text, primary_non_x_url=None):
if not text or not primary_non_x_url:
return False
repaired = repair_broken_urls(text)
idx = repaired.find(primary_non_x_url)
if idx == -1:
return False
tail = repaired[idx:].strip()
if not tail.startswith(("http://", "https://", "www.")):
return False
if re.search(r"(?:https?://|www\.)\S+.*#[^\s#]+", tail):
return True
return False
def find_tail_preservation_start(text, primary_non_x_url):
if not text or not primary_non_x_url:
return None
url_pos = text.find(primary_non_x_url)
if url_pos == -1:
return None
hashtag_match = re.search(r"\s#[^\s#]+", text[url_pos:])
has_hashtag_after_url = hashtag_match is not None
candidates = [url_pos]
clause_patterns = [
r"\.\s+",
r":\s+",
r";\s+",
r"!\s+",
r"\?\s+",
r",\s+",
]
before = text[:url_pos]
for pattern in clause_patterns:
for match in re.finditer(pattern, before):
candidates.append(match.end())
last_newline = before.rfind("\n")
if last_newline != -1:
candidates.append(last_newline + 1)
if has_hashtag_after_url:
# FIX #11 — use named constant URL_TAIL_MAX_LOOKBACK_CHARS instead of literal 120
generous_start = max(0, url_pos - URL_TAIL_MAX_LOOKBACK_CHARS)
while generous_start > 0 and text[generous_start] not in {" ", "\n"}:
generous_start -= 1
candidates.append(generous_start)
# FIX #11 — use named constant URL_TAIL_MAX_CLAUSE_DISTANCE instead of literal 180
reasonable_candidates = [
c for c in candidates if 0 <= c < url_pos and (url_pos - c) <= URL_TAIL_MAX_CLAUSE_DISTANCE
]
if reasonable_candidates:
start = min(reasonable_candidates, key=lambda c: (url_pos - c))
# FIX #11 — use named constant URL_TAIL_MIN_PREFIX_CHARS instead of literal 35
if url_pos - start < URL_TAIL_MIN_PREFIX_CHARS:
farther = [c for c in reasonable_candidates if url_pos - c >= URL_TAIL_MIN_PREFIX_CHARS]
if farther:
start = min(farther, key=lambda c: (url_pos - c))
return start
return url_pos
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
if len(text) <= max_length:
return text
truncated = text[: max_length - 3]
last_space = truncated.rfind(" ")
# FIX #11 — use named constant TRUNCATE_MIN_PREFIX_CHARS instead of literal 0
if last_space > TRUNCATE_MIN_PREFIX_CHARS:
return truncated[:last_space] + "..."
return truncated + "..."
def truncate_text_preserving_tail(
text, tail_start, max_length=BSKY_TEXT_MAX_LENGTH
):
if (
not text
or tail_start is None
or tail_start < 0
or tail_start >= len(text)
):
return truncate_text_safely(text, max_length)
if len(text) <= max_length:
return text
tail = text[tail_start:].strip()
if not tail:
return truncate_text_safely(text, max_length)
reserve = len(tail) + 4
if reserve >= max_length:
shortened_tail = tail[-(max_length - 3) :].strip()
first_space = shortened_tail.find(" ")
if 0 <= first_space <= 30:
shortened_tail = shortened_tail[first_space + 1 :].strip()
return f"...{shortened_tail}"
available_prefix = max_length - reserve
prefix = text[:tail_start].rstrip()
if len(prefix) > available_prefix:
prefix = prefix[:available_prefix].rstrip()
last_space = prefix.rfind(" ")
if last_space > 20:
prefix = prefix[:last_space].rstrip()
final_text = f"{prefix}... {tail}".strip()
final_text = re.sub(r"[ \t]+", " ", final_text)
final_text = re.sub(r"\n{3,}", "\n\n", final_text).strip()
if len(final_text) <= max_length:
return final_text
return truncate_text_safely(text, max_length)
def choose_final_visible_text(
full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True
):
text = clean_post_text(full_clean_text or "")
if not text:
return text
if len(text) <= BSKY_TEXT_MAX_LENGTH:
logging.info(
"🟢 Original cleaned tweet text fits in Bluesky. Preserving exact text."
)
return text
if primary_non_x_url:
tail_start = find_tail_preservation_start(text, primary_non_x_url)
if tail_start is not None:
preserved = truncate_text_preserving_tail(
text, tail_start, BSKY_TEXT_MAX_LENGTH
)
if preserved and len(preserved) <= BSKY_TEXT_MAX_LENGTH:
logging.info(
"🔗 Preserving meaningful ending block with URL/hashtags in visible Bluesky text"
)
return preserved
if prefer_full_text_without_url and not looks_like_url_and_tag_tail(
text, primary_non_x_url
):
text_without_url = remove_url_from_visible_text(
text, primary_non_x_url
).strip()
if (
text_without_url
and len(text_without_url) <= BSKY_TEXT_MAX_LENGTH
):
logging.info(
"🔗 Keeping full visible text by removing long external URL from body and using external card"
)
return text_without_url
truncated = truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH)
logging.info("✂️ Falling back to safe truncation for visible Bluesky text")
return truncated
def normalize_post_text(text):
if not text:
return ""
text = clean_post_text(text)
text = text.replace("\r", "\n")
text = re.sub(r"\s+", " ", text).strip()
return text.lower()
def build_media_fingerprint(tweet):
if not tweet or not tweet.media:
return "no-media"
parts = []
for media in tweet.media:
media_type = getattr(media, "type", "unknown")
media_url = getattr(media, "media_url_https", "") or ""
stable_value = media_url
if media_type == "photo":
stable_value = re.sub(r"[?&]name=\w+", "", stable_value)
stable_value = re.sub(r"[?&]format=\w+", "", stable_value)
elif media_type == "video":
stable_value = canonicalize_tweet_url(
tweet.tweet_url or media_url or ""
)
parts.append(f"{media_type}:{stable_value}")
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def build_bsky_media_fingerprint(post_view):
try:
embed = getattr(post_view, "embed", None)
if not embed:
return "no-media"
parts = []
images = getattr(embed, "images", None)
if images:
for img in images:
image_obj = getattr(img, "image", None)
ref = (
getattr(image_obj, "ref", None)
or getattr(image_obj, "cid", None)
or str(image_obj)
)
parts.append(f"photo:{ref}")
video = getattr(embed, "video", None)
if video:
ref = (
getattr(video, "ref", None)
or getattr(video, "cid", None)
or str(video)
)
parts.append(f"video:{ref}")
external = getattr(embed, "external", None)
if external:
uri = getattr(external, "uri", None) or str(external)
parts.append(f"external:{uri}")
if not parts:
return "no-media"
parts.sort()
raw = "|".join(parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
except Exception as e:
logging.debug(f"Could not build Bluesky media fingerprint: {e}")
return "no-media"
def build_text_media_key(normalized_text, media_fingerprint):
return hashlib.sha256(
f"{normalized_text}||{media_fingerprint}".encode("utf-8")
).hexdigest()
def create_bsky_client(base_url, handle, password):
normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/")
logging.info(
f"🔐 Connecting Bluesky client via base URL: {normalized_base_url}"
)
try:
client = Client(base_url=normalized_base_url)
except TypeError:
logging.warning(
"⚠️ Your atproto Client does not accept base_url in constructor. Falling back."
)
client = Client()
try:
if hasattr(client, "base_url"):
client.base_url = normalized_base_url
elif hasattr(client, "_base_url"):
client._base_url = normalized_base_url
except Exception as e:
logging.warning(
f"⚠️ Could not apply custom base URL cleanly: {e}"
)
client.login(handle, password)
return client
# --- State Management ---
def default_state():
return {
"version": 1,
"posted_tweets": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
logging.info(
f"🧠 No state file found at {state_path}. Starting with empty memory."
)
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
if not isinstance(state, dict):
logging.warning("⚠️ State file is invalid. Reinitializing.")
return default_state()
state.setdefault("version", 1)
state.setdefault("posted_tweets", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(
f"⚠️ Could not load state file {state_path}: {e}. Reinitializing."
)
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp_path = f"{state_path}.tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp_path, state_path)
logging.info(f"💾 State saved to {state_path}")
except Exception as e:
logging.error(f"❌ Failed to save state file {state_path}: {e}")
def remember_posted_tweet(state, candidate, bsky_uri=None):
canonical_tweet_url = candidate.get("canonical_tweet_url")
fallback_key = f"textmedia:{candidate['text_media_key']}"
state_key = canonical_tweet_url or fallback_key
record = {
"canonical_tweet_url": canonical_tweet_url,
"normalized_text": candidate["normalized_text"],
"raw_text": candidate["raw_text"],
"full_clean_text": candidate.get(
"full_clean_text", candidate["raw_text"]
),
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]),
"ordered_non_x_urls": candidate.get("ordered_non_x_urls", []),
"resolved_primary_external_url": candidate.get(
"resolved_primary_external_url"
),
"bsky_uri": bsky_uri,
"tweet_created_on": candidate["tweet"].created_on,
"tweet_url": candidate["tweet"].tweet_url,
"posted_at": arrow.utcnow().isoformat(),
}
state["posted_tweets"][state_key] = record
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = state_key
def candidate_matches_state(candidate, state):
canonical_tweet_url = candidate["canonical_tweet_url"]
text_media_key = candidate["text_media_key"]
normalized_text = candidate["normalized_text"]
posted_tweets = state.get("posted_tweets", {})
if canonical_tweet_url and canonical_tweet_url in posted_tweets:
return True, "state:tweet_url"
for _, record in posted_tweets.items():
if record.get("text_media_key") == text_media_key:
return True, "state:text_media_fingerprint"
for _, record in posted_tweets.items():
if record.get("normalized_text") == normalized_text:
return True, "state:normalized_text"
return False, None
def prune_state(state, max_entries=5000):
posted_tweets = state.get("posted_tweets", {})
if len(posted_tweets) <= max_entries:
return state
sortable = []
for key, record in posted_tweets.items():
posted_at = record.get("posted_at") or ""
sortable.append((key, posted_at))
sortable.sort(key=lambda x: x[1], reverse=True)
keep_keys = {key for key, _ in sortable[:max_entries]}
new_posted_tweets = {}
for key, record in posted_tweets.items():
if key in keep_keys:
new_posted_tweets[key] = record
new_posted_by_bsky_uri = {}
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
if key in keep_keys:
new_posted_by_bsky_uri[bsky_uri] = key
state["posted_tweets"] = new_posted_tweets
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
return state
# --- Bluesky Feed Helpers ---
def extract_urls_from_facets(record):
urls = []
try:
facets = getattr(record, "facets", None) or []
for facet in facets:
features = getattr(facet, "features", None) or []
for feature in features:
uri = getattr(feature, "uri", None)
if uri:
urls.append(uri)
except Exception as e:
logging.debug(f"Could not extract facet URLs: {e}")
return urls
def get_recent_bsky_posts(client, handle, limit=30):
recent_posts = []
try:
timeline = client.get_author_feed(handle, limit=limit)
for item in timeline.feed:
try:
if item.reason is not None:
continue
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized_text = normalize_post_text(text)
urls = []
urls.extend(extract_non_x_urls_from_text(text))
urls.extend(extract_urls_from_facets(record))
canonical_non_x_urls = set()
for url in urls:
if not is_tco_domain(url) and not is_x_or_twitter_domain(
url
):
canonical = canonicalize_url(
normalize_urlish_token(url) or url
)
if canonical:
canonical_non_x_urls.add(canonical)
media_fingerprint = build_bsky_media_fingerprint(item.post)
text_media_key = build_text_media_key(
normalized_text, media_fingerprint
)
recent_posts.append(
{
"uri": getattr(item.post, "uri", None),
"text": text,
"normalized_text": normalized_text,
"canonical_non_x_urls": canonical_non_x_urls,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"created_at": getattr(record, "created_at", None),
}
)
except Exception as e:
logging.debug(
f"Skipping one Bluesky feed item during dedupe fetch: {e}"
)
except Exception as e:
# FIX #9 — elevated to WARNING so operators notice live dedup is disabled
logging.warning(
f"⚠️ Could not fetch recent Bluesky posts for duplicate detection "
f"(live dedup disabled for this cycle): {e}"
)
return recent_posts
# --- Upload / Retry Helpers ---
def get_rate_limit_wait_seconds(error_obj, default_delay):
try:
headers = getattr(error_obj, "headers", None)
if headers:
reset_value = headers.get("ratelimit-reset") or headers.get(
"RateLimit-Reset"
)
if reset_value:
now_ts = int(time.time())
reset_ts = int(reset_value)
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY)
except Exception:
pass
return default_delay
def is_transient_error(error_obj):
error_text = repr(error_obj)
transient_signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
]
return any(signal in error_text for signal in transient_signals)
def upload_blob_with_retry(client, binary_data, media_label="media"):
last_exception = None
transient_attempts = 0
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
try:
result = client.upload_blob(binary_data)
return result.blob
except Exception as e:
last_exception = e
error_text = str(e)
is_rate_limited = (
"429" in error_text or "RateLimitExceeded" in error_text
)
if is_rate_limited:
backoff_delay = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY,
)
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning(
f"⏳ Bluesky blob upload rate-limited for {media_label}. "
f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
else:
logging.warning(
f"❌ Exhausted blob upload retries for {media_label} after rate limiting: {repr(e)}"
)
break
if (
is_transient_error(e)
and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES
):
transient_attempts += 1
wait_seconds = (
BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
)
logging.warning(
f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. "
f"Transient retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
logging.warning(f"Could not upload {media_label}: {repr(e)}")
if hasattr(e, "response") and e.response is not None:
try:
logging.warning(
f"Upload response status: {e.response.status_code}"
)
logging.warning(
f"Upload response body: {e.response.text}"
)
except Exception:
pass
return None
logging.warning(f"Could not upload {media_label}: {repr(last_exception)}")
return None
def send_post_with_retry(client, **kwargs):
"""
Wrapper around client.send_post() with retry logic for transient errors
and rate limiting.
"""
last_exception = None
for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1):
try:
return client.send_post(**kwargs)
except Exception as e:
last_exception = e
error_text = str(e)
is_rate_limited = (
"429" in error_text or "RateLimitExceeded" in error_text
)
if is_rate_limited:
backoff_delay = min(
BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_SEND_POST_MAX_DELAY,
)
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
if attempt < BSKY_SEND_POST_MAX_RETRIES:
logging.warning(
f"⏳ Bluesky send_post rate-limited. "
f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
else:
logging.error(
f"❌ Exhausted send_post retries after rate limiting: {repr(e)}"
)
raise
if (
is_transient_error(e)
and attempt < BSKY_SEND_POST_MAX_RETRIES
):
wait_seconds = BSKY_SEND_POST_BASE_DELAY * attempt
logging.warning(
f"⏳ Transient send_post failure: {repr(e)}. "
f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
raise
raise last_exception
# --- Image Compression ---
def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES):
try:
with Image.open(io.BytesIO(image_bytes)) as img:
img = img.convert("RGB")
width, height = img.size
max_dim = max(width, height)
if max_dim > BSKY_IMAGE_MAX_DIMENSION:
scale = BSKY_IMAGE_MAX_DIMENSION / max_dim
new_size = (
max(1, int(width * scale)),
max(1, int(height * scale)),
)
img = img.resize(new_size, Image.LANCZOS)
logging.info(
f"🖼️ Resized post image to {new_size[0]}x{new_size[1]}"
)
for quality in [90, 82, 75, 68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]:
out = io.BytesIO()
img.save(
out,
format="JPEG",
quality=quality,
optimize=True,
progressive=True,
)
data = out.getvalue()
logging.info(
f"🖼️ Post image candidate size at JPEG quality {quality}: "
f"{len(data)} bytes ({len(data) / 1024:.2f} KB)"
)
if len(data) <= max_bytes:
return data
for target_dim in [1800, 1600, 1400, 1200, 1000]:
resized = img.copy()
width, height = resized.size
max_dim = max(width, height)
if max_dim > target_dim:
scale = target_dim / max_dim
new_size = (
max(1, int(width * scale)),
max(1, int(height * scale)),
)
resized = resized.resize(new_size, Image.LANCZOS)
for quality in [68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]:
out = io.BytesIO()
resized.save(
out,
format="JPEG",
quality=quality,
optimize=True,
progressive=True,
)
data = out.getvalue()
logging.info(
f"🖼️ Post image resized to <= {target_dim}px at quality {quality}: "
f"{len(data)} bytes ({len(data) / 1024:.2f} KB)"
)
if len(data) <= max_bytes:
return data
except Exception as e:
logging.warning(f"Could not compress post image: {repr(e)}")
return None
def get_blob_from_url(media_url, client, http_client):
try:
r = http_client.get(
media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True
)
if r.status_code != 200:
logging.warning(
f"Could not fetch media {media_url}: HTTP {r.status_code}"
)
return None
content = r.content
if not content:
logging.warning(
f"Could not fetch media {media_url}: empty response body"
)
return None
content_type = (r.headers.get("content-type") or "").lower()
upload_bytes = content
if content_type.startswith("image/"):
original_size = len(content)
logging.info(
f"🖼️ Downloaded post image {media_url} "
f"({original_size} bytes / {original_size / 1024:.2f} KB)"
)
if original_size > BSKY_IMAGE_MAX_BYTES:
logging.info(
f"🖼️ Post image exceeds safe Bluesky limit "
f"({original_size} bytes > {BSKY_IMAGE_MAX_BYTES} bytes). Compressing..."
)
compressed = compress_post_image_to_limit(
content, BSKY_IMAGE_MAX_BYTES
)
if compressed:
upload_bytes = compressed
logging.info(
f"✅ Post image compressed to {len(upload_bytes)} bytes "
f"({len(upload_bytes) / 1024:.2f} KB)"
)
else:
logging.warning(
f"⚠️ Could not compress post image to safe limit: {media_url}"
)
return None
return upload_blob_with_retry(
client, upload_bytes, media_label=media_url
)
except Exception as e:
logging.warning(f"Could not fetch media {media_url}: {repr(e)}")
return None
def get_blob_from_file(file_path, client):
try:
if not os.path.exists(file_path):
logging.warning(
f"Could not upload local file {file_path}: file does not exist"
)
return None
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
logging.info(
f"📦 Uploading local file {file_path} ({file_size_mb:.2f} MB)"
)
if (
file_path.lower().endswith(".mp4")
and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB
):
logging.warning(
f"Could not upload local file {file_path}: "
f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)"
)
return None
with open(file_path, "rb") as f:
binary_data = f.read()
return upload_blob_with_retry(
client, binary_data, media_label=file_path
)
except Exception as e:
logging.warning(
f"Could not upload local file {file_path}: {repr(e)}"
)
if hasattr(e, "response") and e.response is not None:
try:
logging.warning(
f"Upload response status: {e.response.status_code}"
)
logging.warning(
f"Upload response body: {e.response.text}"
)
except Exception:
pass
return None
def compress_external_thumb_to_limit(
image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES
):
try:
with Image.open(io.BytesIO(image_bytes)) as img:
img = img.convert("RGB")
width, height = img.size
max_dim = max(width, height)
if max_dim > EXTERNAL_THUMB_MAX_DIMENSION:
scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim
new_size = (
max(1, int(width * scale)),
max(1, int(height * scale)),
)
img = img.resize(new_size, Image.LANCZOS)
logging.info(
f"🖼️ Resized external thumb to {new_size[0]}x{new_size[1]}"
)
for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
out = io.BytesIO()
img.save(
out,
format="JPEG",
quality=quality,
optimize=True,
progressive=True,
)
data = out.getvalue()
logging.info(
f"🖼️ External thumb candidate size at JPEG quality {quality}: "
f"{len(data) / 1024:.2f} KB"
)
if len(data) <= max_bytes:
return data
for target_dim in [1000, 900, 800, 700, 600]:
resized = img.copy()
width, height = resized.size
max_dim = max(width, height)
if max_dim > target_dim:
scale = target_dim / max_dim
new_size = (
max(1, int(width * scale)),
max(1, int(height * scale)),
)
resized = resized.resize(new_size, Image.LANCZOS)
for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
out = io.BytesIO()
resized.save(
out,
format="JPEG",
quality=quality,
optimize=True,
progressive=True,
)
data = out.getvalue()
logging.info(
f"🖼️ External thumb resized to <= {target_dim}px at quality {quality}: "
f"{len(data) / 1024:.2f} KB"
)
if len(data) <= max_bytes:
return data
except Exception as e:
logging.warning(
f"Could not compress external thumbnail: {repr(e)}"
)
return None
def get_external_thumb_blob_from_url(image_url, client, http_client):
try:
r = http_client.get(
image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True
)
if r.status_code != 200:
logging.warning(
f"Could not fetch external thumb {image_url}: HTTP {r.status_code}"
)
return None
content = r.content
if not content:
logging.warning(
f"Could not fetch external thumb {image_url}: empty body"
)
return None
original_size_kb = len(content) / 1024
logging.info(
f"🖼️ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)"
)
upload_bytes = content
if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES:
logging.info(
f"🖼️ External thumb exceeds safe limit "
f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..."
)
compressed = compress_external_thumb_to_limit(
upload_bytes, EXTERNAL_THUMB_MAX_BYTES
)
if compressed:
upload_bytes = compressed
logging.info(
f"✅ External thumb compressed to {len(upload_bytes) / 1024:.2f} KB"
)
else:
logging.warning(
"⚠️ Could not compress external thumb to fit limit. Will omit thumbnail."
)
return None
else:
logging.info("✅ External thumb already within safe size limit.")
blob = upload_blob_with_retry(
client,
upload_bytes,
media_label=f"external-thumb:{image_url}",
)
if blob:
return blob
logging.warning("⚠️ External thumb upload failed. Will omit thumbnail.")
return None
except Exception as e:
logging.warning(
f"Could not fetch/upload external thumb {image_url}: {repr(e)}"
)
return None
def fetch_link_metadata(url, http_client):
try:
r = http_client.get(
url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True
)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
title = soup.find("meta", property="og:title") or soup.find("title")
desc = (
soup.find("meta", property="og:description")
or soup.find("meta", attrs={"name": "description"})
)
image = (
soup.find("meta", property="og:image")
or soup.find("meta", attrs={"name": "twitter:image"})
)
return {
"title": (
title["content"]
if title and title.has_attr("content")
else (title.text.strip() if title and title.text else "")
),
"description": (
desc["content"] if desc and desc.has_attr("content") else ""
),
"image": (
image["content"] if image and image.has_attr("content") else None
),
}
except Exception as e:
logging.warning(
f"Could not fetch link metadata for {url}: {repr(e)}"
)
return {}
def build_external_link_embed(
url, client, http_client, fallback_title="Link",
prefetched_metadata=None,
):
# FIX #5 — accept pre-fetched metadata to avoid a duplicate HTTP request
# when the caller already fetched it for build_dynamic_alt.
link_metadata = prefetched_metadata if prefetched_metadata is not None \
else fetch_link_metadata(url, http_client)
thumb_blob = None
if link_metadata.get("image"):
thumb_blob = get_external_thumb_blob_from_url(
link_metadata["image"], client, http_client
)
if thumb_blob:
logging.info("✅ External link card thumbnail prepared successfully")
else:
logging.info(" External link card will be posted without thumbnail")
if (
link_metadata.get("title")
or link_metadata.get("description")
or thumb_blob
):
return models.AppBskyEmbedExternal.Main(
external=models.AppBskyEmbedExternal.External(
uri=url,
title=link_metadata.get("title") or fallback_title,
description=link_metadata.get("description") or "",
thumb=thumb_blob,
)
)
return None
def make_rich(content):
# FIX #10 — note explaining @mention limitation.
# Bluesky supports native @mention facets, but resolving a Twitter handle
# to a Bluesky DID requires an external lookup (e.g. via the atproto
# identity resolution API). That mapping is not available here, so
# @mentions are intentionally passed through as plain text. If you add a
# handle-mapping table in the future, call
# text_builder.mention(word, did) here instead of text_builder.text(word).
text_builder = client_utils.TextBuilder()
content = clean_post_text(content)
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
cleaned_word = strip_trailing_url_punctuation(word)
normalized_candidate = normalize_urlish_token(cleaned_word)
if normalized_candidate:
if is_x_or_twitter_domain(normalized_candidate):
text_builder.text(word)
else:
clean_url_value = clean_url(normalized_candidate)
if clean_url_value and is_valid_url(clean_url_value):
display_text = cleaned_word
text_builder.link(display_text, clean_url_value)
trailing = word[len(cleaned_word):]
if trailing:
text_builder.text(trailing)
else:
text_builder.text(word)
elif cleaned_word.startswith("#") and len(cleaned_word) > 1:
clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"")
if clean_tag:
text_builder.tag(cleaned_word, clean_tag)
trailing = word[len(cleaned_word):]
if trailing:
text_builder.text(trailing)
else:
text_builder.text(word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
def build_dynamic_alt(raw_text, link_title=None):
# FIX #5 — accept optional link_title so URL-only tweets get a richer alt
# instead of always falling back to the generic "Attached video or image" string.
dynamic_alt = clean_post_text(raw_text)
dynamic_alt = dynamic_alt.replace("\n", " ").strip()
dynamic_alt = re.sub(
r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt
).strip()
if not dynamic_alt and link_title:
dynamic_alt = link_title.strip()
# FIX #11 — use named constant DYNAMIC_ALT_MAX_LENGTH instead of literal 150
if len(dynamic_alt) > DYNAMIC_ALT_MAX_LENGTH:
dynamic_alt = dynamic_alt[:DYNAMIC_ALT_MAX_LENGTH - 3] + "..."
elif not dynamic_alt:
dynamic_alt = "Attached video or image from tweet"
return dynamic_alt
def build_video_embed(video_blob, alt_text):
try:
return models.AppBskyEmbedVideo.Main(
video=video_blob, alt=alt_text
)
except AttributeError:
logging.error(
"❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto."
)
return None
# --- Twitter Scraping ---
def scrape_tweets_via_playwright(username, password, email, target_handle):
tweets = []
state_file = "twitter_browser_state.json"
# FIX #14 — enforce restrictive permissions on the session cookie file
if os.path.exists(state_file):
try:
os.chmod(state_file, SESSION_FILE_PERMISSIONS)
except Exception as e:
logging.warning(
f"⚠️ Could not set permissions on {state_file}: {e}"
)
with sync_playwright() as p:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"],
)
clean_ua = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
)
# FIX #1 — all Playwright browser context variables renamed to
# 'browser_context' throughout this function to eliminate the name
# collision with the 'context_text' / 'social_context_el' variables
# used inside the per-article parsing loop below.
browser_context = None
needs_login = True
# FIX #7 — track the session-check page explicitly so we can close
# it before opening the profile scrape page, preventing a page leak.
session_check_page = None
if os.path.exists(state_file):
logging.info(
"✅ Found existing browser state. Attempting to bypass login..."
)
browser_context = browser.new_context(
user_agent=clean_ua,
viewport={"width": 1920, "height": 1080},
storage_state=state_file,
)
session_check_page = browser_context.new_page()
session_check_page.goto("https://x.com/home")
time.sleep(3)
if (
session_check_page.locator(
'[data-testid="SideNav_NewTweet_Button"]'
).is_visible()
or "/home" in session_check_page.url
):
logging.info("✅ Session is valid!")
needs_login = False
else:
logging.warning(
"⚠️ Saved session expired or invalid. Re-logging in..."
)
# FIX #7 — close the check page before closing the context
session_check_page.close()
session_check_page = None
browser_context.close()
browser_context = None
os.remove(state_file)
# FIX #7 — always close the session-check page before opening the
# profile page, whether a re-login was needed or not.
if session_check_page is not None:
session_check_page.close()
session_check_page = None
if needs_login:
logging.info(
"🚀 Launching fresh browser for automated Twitter login..."
)
browser_context = browser.new_context(
user_agent=clean_ua,
viewport={"width": 1920, "height": 1080},
)
login_page = browser_context.new_page()
try:
login_page.goto("https://x.com")
sign_in_button = login_page.get_by_text("Sign in", exact=True)
sign_in_button.wait_for(state="visible", timeout=15000)
sign_in_button.click(force=True)
login_page.wait_for_selector(
'h1:has-text("Sign in to X")',
state="visible",
timeout=25000,
)
logging.info(f"👤 Entering username: {username}...")
time.sleep(1)
username_input = login_page.locator(
'input[autocomplete="username"]'
).first
username_input.wait_for(state="visible", timeout=15000)
username_input.click(force=True)
username_input.press_sequentially(username, delay=100)
login_page.locator('button:has-text("Next")').first.click(
force=True
)
login_page.wait_for_selector(
'input[name="password"], '
'input[data-testid="ocfEnterTextTextInput"], '
'input[name="text"]',
timeout=15000,
)
time.sleep(1)
if login_page.locator(
'input[data-testid="ocfEnterTextTextInput"]'
).is_visible() or login_page.locator(
'input[name="text"]'
).is_visible():
logging.warning(
"🛡️ Security challenge detected! Entering email/phone..."
)
login_page.fill(
'input[data-testid="ocfEnterTextTextInput"], '
'input[name="text"]',
email,
)
sec_next = login_page.locator(
'[data-testid="ocfEnterTextNextButton"], '
'span:has-text("Next")'
).first
if sec_next.is_visible():
sec_next.click(force=True)
else:
login_page.keyboard.press("Enter")
login_page.wait_for_selector(
'input[name="password"]', timeout=15000
)
time.sleep(1)
logging.info("🔑 Entering password...")
login_page.fill('input[name="password"]', password)
login_page.locator('span:has-text("Log in")').first.click()
login_page.wait_for_url("**/home", timeout=20000)
time.sleep(3)
browser_context.storage_state(path=state_file)
# FIX #14 — set restrictive permissions immediately after writing
try:
os.chmod(state_file, SESSION_FILE_PERMISSIONS)
except Exception as chmod_err:
logging.warning(
f"⚠️ Could not set permissions on {state_file} "
f"after save: {chmod_err}"
)
logging.info("✅ Login successful. Browser state saved.")
except Exception as e:
take_error_screenshot(login_page, "login_failed")
logging.error(f"❌ Login failed: {e}")
login_page.close()
browser.close()
return []
# FIX #7 — close the login page cleanly before opening scrape page
login_page.close()
logging.info(
f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets..."
)
scrape_page = browser_context.new_page()
scrape_page.goto(f"https://x.com/{target_handle}")
try:
scrape_page.wait_for_selector("article", timeout=20000)
time.sleep(2)
articles = scrape_page.locator("article").all()
logging.info(
f"📊 Found {len(articles)} tweets on screen. "
f"Parsing up to {SCRAPE_TWEET_LIMIT}..."
)
for article in articles[:SCRAPE_TWEET_LIMIT]:
try:
time_el = article.locator("time").first
if not time_el.is_visible():
continue
created_at = time_el.get_attribute("datetime")
tweet_url = None
time_link = article.locator("a:has(time)").first
if time_link.is_visible():
href = time_link.get_attribute("href")
if href:
tweet_url = (
f"https://x.com{href}"
if href.startswith("/")
else href
)
# --- Retweet detection ---
is_retweet = False
try:
# FIX #1 — renamed from 'context' to 'social_context_el'
social_context_el = article.locator(
'[data-testid="socialContext"]'
).first
if social_context_el.is_visible():
context_text = social_context_el.inner_text().lower()
repost_keywords = [
"reposted",
"retweeted",
"ha repostejat",
"ha retuitat",
"repostejat",
"ha reposteado",
"retuiteó",
]
if any(kw in context_text for kw in repost_keywords):
is_retweet = True
logging.info(
f"🔁 Detected retweet/repost: {tweet_url}"
)
except Exception:
pass
text_locator = article.locator(
'[data-testid="tweetText"]'
).first
text = (
text_locator.inner_text()
if text_locator.is_visible()
else ""
)
media_urls = []
photo_locators = article.locator(
'[data-testid="tweetPhoto"] img'
).all()
for img in photo_locators:
src = img.get_attribute("src")
if src:
src = re.sub(r"&name=\w+", "&name=large", src)
media_urls.append((src, "photo"))
video_locators = article.locator(
'[data-testid="videoPlayer"]'
).all()
if video_locators:
media_urls.append((tweet_url or "", "video"))
# --- Card URL extraction (link preview card) ---
card_url = None
try:
card_locator = article.locator(
'[data-testid="card.wrapper"] a[href]'
).first
if card_locator.is_visible():
card_href = card_locator.get_attribute("href")
if card_href:
card_url = card_href.strip()
logging.info(
f"🃏 Scraped card URL from tweet: {card_url}"
)
except Exception:
pass
if not card_url:
try:
card_role_link = article.locator(
'[data-testid="card.wrapper"] [role="link"]'
).first
if card_role_link.is_visible():
card_a = card_role_link.locator(
"a[href]"
).first
if card_a.is_visible():
card_href = card_a.get_attribute("href")
if card_href:
card_url = card_href.strip()
logging.info(
f"🃏 Scraped card URL (fallback) from tweet: {card_url}"
)
except Exception:
pass
tweets.append(
ScrapedTweet(
created_at,
text,
media_urls,
tweet_url=tweet_url,
card_url=card_url,
is_retweet=is_retweet,
)
)
except Exception as e:
logging.warning(
f"⚠️ Failed to parse a specific tweet: {e}"
)
continue
except Exception as e:
take_error_screenshot(scrape_page, "scrape_failed")
logging.error(f"❌ Failed to scrape profile: {e}")
browser.close()
return tweets
# --- Video Extraction & Processing ---
def extract_video_url_from_tweet_page(browser_context, tweet_url):
# FIX #1 — parameter renamed from 'context' to 'browser_context'
page = browser_context.new_page()
best_m3u8_url = None
best_video_mp4_url = None
seen_urls = set()
def is_audio_only_mp4(url, content_type):
url_l = url.lower()
content_type_l = content_type.lower()
return (
"/aud/" in url_l
or "/audio/" in url_l
or "mp4a" in url_l
or (
"audio/" in content_type_l
and "video/" not in content_type_l
)
)
def handle_response(response):
nonlocal best_m3u8_url, best_video_mp4_url
try:
url = response.url
if url in seen_urls:
return
seen_urls.add(url)
url_l = url.lower()
content_type = response.headers.get("content-type", "")
content_type_l = content_type.lower()
if ".m4s" in url_l:
return
if (
".m3u8" in url_l
or "application/vnd.apple.mpegurl" in content_type_l
or "application/x-mpegurl" in content_type_l
):
if best_m3u8_url is None:
best_m3u8_url = url
logging.info(f"📺 Found HLS playlist URL: {url}")
return
if (
".mp4" in url_l
or "video/mp4" in content_type_l
or "audio/mp4" in content_type_l
):
if is_audio_only_mp4(url, content_type):
logging.info(f"🔇 Ignoring audio-only MP4: {url}")
return
if best_video_mp4_url is None:
best_video_mp4_url = url
logging.info(f"🎥 Found VIDEO MP4 URL: {url}")
return
except Exception as e:
logging.debug(f"Response parsing error: {e}")
page.on("response", handle_response)
def current_best():
return best_m3u8_url or best_video_mp4_url
try:
logging.info(
f"🎬 Opening tweet page to capture video URL: {tweet_url}"
)
page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000)
time.sleep(2)
player = page.locator('[data-testid="videoPlayer"]').first
if player.count() > 0:
try:
player.scroll_into_view_if_needed(timeout=5000)
except Exception:
pass
try:
player.click(force=True, timeout=5000)
logging.info("▶️ Clicked video player")
except Exception as e:
logging.info(f"⚠️ First player click failed: {e}")
else:
logging.warning("⚠️ No video player locator found on tweet page")
# FIX #11 — use named constant VIDEO_PLAYER_WAIT_ROUNDS
for _ in range(VIDEO_PLAYER_WAIT_ROUNDS):
if current_best():
break
time.sleep(1)
if not current_best() and player.count() > 0:
logging.info(
"🔁 No media URL found yet, retrying player interaction..."
)
try:
player.click(force=True, timeout=5000)
# FIX #11 — use named constant PLAYWRIGHT_RETRY_SLEEP_S
time.sleep(PLAYWRIGHT_RETRY_SLEEP_S)
except Exception as e:
logging.info(f"⚠️ Retry click failed: {e}")
try:
page.keyboard.press("Space")
time.sleep(1)
except Exception:
pass
# FIX #11 — use named constant VIDEO_PLAYER_RETRY_ROUNDS
for _ in range(VIDEO_PLAYER_RETRY_ROUNDS):
if current_best():
break
time.sleep(1)
selected_url = current_best()
if selected_url:
logging.info(f"✅ Selected media URL for download: {selected_url}")
else:
logging.warning(
f"⚠️ No playable media URL detected on tweet page: {tweet_url}"
)
return selected_url
except Exception as e:
logging.warning(
f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}"
)
return None
finally:
page.close()
def _probe_video_duration(file_path):
"""
FIX #6 — Use ffprobe via subprocess instead of VideoFileClip to get video
duration. This avoids a potential hang on corrupt/truncated files since we
apply a hard timeout to the subprocess call.
Returns duration in seconds as a float, or raises RuntimeError on failure.
"""
probe_cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path,
]
try:
result = subprocess.run(
probe_cmd,
capture_output=True,
text=True,
timeout=FFPROBE_TIMEOUT_SECONDS,
)
if result.returncode != 0:
raise RuntimeError(
f"ffprobe exited with code {result.returncode}: "
f"{result.stderr.strip()}"
)
duration_str = result.stdout.strip()
if not duration_str:
raise RuntimeError("ffprobe returned empty duration output")
return float(duration_str)
except subprocess.TimeoutExpired:
raise RuntimeError(
f"ffprobe timed out after {FFPROBE_TIMEOUT_SECONDS}s on {file_path}"
)
def download_and_crop_video(video_url, output_path):
temp_input = output_path.replace(".mp4", "_source.mp4")
temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4")
temp_output = output_path.replace(".mp4", "_compressed.mp4")
try:
logging.info(
f"⬇️ Downloading video source with ffmpeg: {video_url}"
)
video_url_l = video_url.lower()
if ".m3u8" in video_url_l:
logging.info("📺 Using HLS ffmpeg mode")
download_cmd = [
"ffmpeg", "-y",
"-protocol_whitelist", "file,http,https,tcp,tls,crypto",
"-allowed_extensions", "ALL",
"-i", video_url,
"-c", "copy",
temp_input,
]
else:
logging.info("🎥 Using direct MP4 ffmpeg mode")
download_cmd = [
"ffmpeg", "-y",
"-i", video_url,
"-c", "copy",
temp_input,
]
download_result = subprocess.run(
download_cmd,
capture_output=True,
text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if download_result.returncode != 0:
logging.error(
f"❌ ffmpeg download failed:\n{download_result.stderr}"
)
return None
if (
not os.path.exists(temp_input)
or os.path.getsize(temp_input) == 0
):
logging.error(
"❌ Downloaded video source file is missing or empty."
)
return None
logging.info(f"✅ Video downloaded: {temp_input}")
# FIX #6 — probe duration with ffprobe (hard timeout) instead of
# VideoFileClip, which can hang indefinitely on corrupt files.
try:
duration = _probe_video_duration(temp_input)
except RuntimeError as probe_err:
logging.error(f"❌ Could not probe video duration: {probe_err}")
return None
if duration <= 0:
logging.error(
"❌ Downloaded video has invalid or unknown duration."
)
return None
end_time = min(VIDEO_MAX_DURATION_SECONDS, duration)
# FIX #2 — wrap VideoFileClip usage in nested try/finally blocks so
# both the source clip and the subclip handles are always closed, even
# if write_videofile raises an exception mid-way.
video_clip = VideoFileClip(temp_input)
try:
if hasattr(video_clip, "subclipped"):
cropped_clip = video_clip.subclipped(0, end_time)
else:
cropped_clip = video_clip.subclip(0, end_time)
try:
cropped_clip.write_videofile(
temp_trimmed,
codec="libx264",
audio_codec="aac",
preset="veryfast",
bitrate="1800k",
audio_bitrate="128k",
logger=None,
)
finally:
cropped_clip.close() # FIX #2 — always close subclip
finally:
video_clip.close() # FIX #2 — always close source clip
if (
not os.path.exists(temp_trimmed)
or os.path.getsize(temp_trimmed) == 0
):
logging.error("❌ Trimmed video output is missing or empty.")
return None
trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024)
logging.info(
f"📦 Trimmed video size before compression: {trimmed_size_mb:.2f} MB"
)
compress_cmd = [
"ffmpeg", "-y",
"-i", temp_trimmed,
"-vf", "scale='min(720,iw)':-2",
"-c:v", "libx264",
"-preset", "veryfast",
"-crf", "30",
"-maxrate", "1800k",
"-bufsize", "3600k",
"-c:a", "aac",
"-b:a", "128k",
"-movflags", "+faststart",
temp_output,
]
compress_result = subprocess.run(
compress_cmd,
capture_output=True,
text=True,
timeout=SUBPROCESS_TIMEOUT_SECONDS,
)
if compress_result.returncode != 0:
logging.error(
f"❌ ffmpeg compression failed:\n{compress_result.stderr}"
)
return None
if (
not os.path.exists(temp_output)
or os.path.getsize(temp_output) == 0
):
logging.error("❌ Compressed video output is missing or empty.")
return None
final_size_mb = os.path.getsize(temp_output) / (1024 * 1024)
logging.info(
f"✅ Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)"
)
os.replace(temp_output, output_path)
logging.info(f"✅ Final video ready: {output_path}")
return output_path
except subprocess.TimeoutExpired:
logging.error(
f"❌ ffmpeg subprocess timed out after {SUBPROCESS_TIMEOUT_SECONDS}s"
)
return None
except Exception as e:
logging.error(f"❌ Error processing video: {repr(e)}")
return None
finally:
remove_file_quietly(temp_input)
remove_file_quietly(temp_trimmed)
# temp_output was either renamed to output_path via os.replace()
# or never created; remove_file_quietly is a no-op if it doesn't exist.
remove_file_quietly(temp_output)
# --- Deduplication ---
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
candidate_non_x_urls = candidate["canonical_non_x_urls"]
candidate_text_media_key = candidate["text_media_key"]
candidate_normalized_text = candidate["normalized_text"]
for existing in recent_bsky_posts:
existing_non_x_urls = existing["canonical_non_x_urls"]
if (
candidate_non_x_urls
and candidate_non_x_urls == existing_non_x_urls
and candidate_normalized_text == existing["normalized_text"]
):
return True, "bsky:normalized_text_plus_non_x_urls"
if candidate_text_media_key == existing["text_media_key"]:
return True, "bsky:text_media_fingerprint"
if candidate_normalized_text == existing["normalized_text"]:
return True, "bsky:normalized_text"
return False, None
def sync_feeds(args):
logging.info("🔄 Starting sync cycle...")
dry_run = getattr(args, "dry_run", False)
bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS
if dry_run:
logging.info(
"🧪 DRY RUN MODE — no posts will be created on Bluesky."
)
try:
state = load_state(STATE_PATH)
# FIX #8 — prune on load so the state file never grows unbounded
# between runs, not only after individual posts.
state = prune_state(state, max_entries=5000)
tweets = scrape_tweets_via_playwright(
args.twitter_username,
args.twitter_password,
args.twitter_email,
args.twitter_handle,
)
if not tweets:
logging.warning(
"⚠️ No tweets found or failed to fetch. "
"Skipping Bluesky sync for this cycle."
)
return
bsky_client = None
if not dry_run:
bsky_client = create_bsky_client(
args.bsky_base_url,
args.bsky_handle,
args.bsky_password,
)
recent_bsky_posts = []
if not dry_run:
recent_bsky_posts = get_recent_bsky_posts(
bsky_client,
args.bsky_handle,
limit=DEDUPE_BSKY_LIMIT,
)
logging.info(
f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts "
f"for duplicate detection."
)
logging.info(
f"🧠 Local state currently tracks "
f"{len(state.get('posted_tweets', {}))} posted items."
)
too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS)
logging.info(f"🕒 Will ignore tweets older than: {too_old_cutoff}")
candidate_tweets = []
# --- Cheap prefilter before expensive processing ---
cheap_candidates = []
for tweet in reversed(tweets):
try:
tweet_time = arrow.get(tweet.created_on)
if tweet_time < too_old_cutoff:
logging.info(f"⏭️ Skipping old tweet from {tweet_time}")
continue
if tweet.is_retweet:
logging.info(
f"⏭️ Skipping retweet/repost: {tweet.tweet_url}"
)
continue
canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url)
if canonical_tweet_url and canonical_tweet_url in state.get(
"posted_tweets", {}
):
logging.info(
f"⚡ Early skip due to known tweet URL in local state: "
f"{canonical_tweet_url}"
)
continue
scraped_text = clean_post_text(tweet.text or "")
if not scraped_text and not tweet.media:
logging.info(
f"⏭️ Skipping empty/blank tweet from {tweet_time}"
)
continue
cheap_candidates.append(
(tweet, tweet_time, canonical_tweet_url)
)
except Exception as e:
logging.warning(f"⚠️ Failed during cheap prefilter: {e}")
logging.info(
f"{len(cheap_candidates)} tweets remain after cheap prefilter."
)
with httpx.Client() as resolve_http_client:
for tweet, tweet_time, canonical_tweet_url in cheap_candidates:
try:
(
full_clean_text,
resolved_primary_external_url,
) = build_effective_tweet_text(tweet, resolve_http_client)
normalized_text = normalize_post_text(full_clean_text)
if not normalized_text and not tweet.media:
logging.info(
f"⏭️ Skipping empty/blank tweet after enrichment "
f"from {tweet_time}"
)
continue
ordered_non_x_urls = extract_ordered_non_x_urls(
full_clean_text
)
canonical_non_x_urls = set()
if resolved_primary_external_url:
canonical_non_x_urls.add(
canonicalize_url(resolved_primary_external_url)
)
for raw_url in ordered_non_x_urls:
if not is_tco_domain(
raw_url
) and not is_x_or_twitter_domain(raw_url):
canonical_non_x_urls.add(
canonicalize_url(raw_url)
)
primary_non_x_url = None
if resolved_primary_external_url:
primary_non_x_url = resolved_primary_external_url
else:
primary_non_x_url = extract_first_visible_non_x_url(
full_clean_text
)
if not primary_non_x_url and ordered_non_x_urls:
primary_non_x_url = ordered_non_x_urls[0]
has_video = any(
getattr(m, "type", None) == "video"
for m in (tweet.media or [])
)
has_photo = any(
getattr(m, "type", None) == "photo"
for m in (tweet.media or [])
)
raw_text = choose_final_visible_text(
full_clean_text,
primary_non_x_url=primary_non_x_url,
prefer_full_text_without_url=False,
)
media_fingerprint = build_media_fingerprint(tweet)
text_media_key = build_text_media_key(
normalized_text, media_fingerprint
)
candidate = {
"tweet": tweet,
"tweet_time": tweet_time,
"raw_text": raw_text,
"full_clean_text": full_clean_text,
"normalized_text": normalized_text,
"media_fingerprint": media_fingerprint,
"text_media_key": text_media_key,
"canonical_tweet_url": canonical_tweet_url,
"canonical_non_x_urls": canonical_non_x_urls,
"ordered_non_x_urls": ordered_non_x_urls,
"primary_non_x_url": primary_non_x_url,
"resolved_primary_external_url": resolved_primary_external_url,
"looks_like_title_plus_url": looks_like_title_plus_url_post(
full_clean_text
),
"has_video": has_video,
"has_photo": has_photo,
}
is_dup_state, reason_state = candidate_matches_state(
candidate, state
)
if is_dup_state:
logging.info(
f"⏭️ Skipping candidate due to local state duplicate "
f"match on: {reason_state}"
)
continue
is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(
candidate, recent_bsky_posts
)
if is_dup_bsky:
logging.info(
f"⏭️ Skipping candidate due to recent Bluesky duplicate "
f"match on: {reason_bsky}"
)
continue
candidate_tweets.append(candidate)
except Exception as e:
logging.warning(
f"⚠️ Failed to prepare candidate tweet: {e}"
)
logging.info(
f"📬 {len(candidate_tweets)} tweets remain after duplicate filtering."
)
if not candidate_tweets:
logging.info(
"✅ No new tweets need posting after duplicate comparison."
)
return
new_posts = 0
browser_state_file = "twitter_browser_state.json"
with sync_playwright() as p, httpx.Client() as media_http_client:
browser = p.chromium.launch(
headless=True,
args=["--disable-blink-features=AutomationControlled"],
)
context_kwargs = {
"user_agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/145.0.7632.6 Safari/537.36"
),
"viewport": {"width": 1920, "height": 1080},
}
if os.path.exists(browser_state_file):
context_kwargs["storage_state"] = browser_state_file
# FIX #1 — renamed from 'context' to 'browser_context'
browser_context = browser.new_context(**context_kwargs)
for candidate in candidate_tweets:
tweet = candidate["tweet"]
tweet_time = candidate["tweet_time"]
raw_text = candidate["raw_text"]
full_clean_text = candidate["full_clean_text"]
logging.info(
f"📝 {'[DRY RUN] Would post' if dry_run else 'Posting'} "
f"missing tweet from {tweet_time} to Bluesky..."
)
if dry_run:
logging.info(
f" 📄 Text: {raw_text[:200]}"
f"{'...' if len(raw_text) > 200 else ''}"
)
logging.info(
f" 🔗 Primary external URL: "
f"{candidate.get('resolved_primary_external_url', 'None')}"
)
logging.info(
f" 🃏 Card URL: {getattr(tweet, 'card_url', 'None')}"
)
logging.info(
f" 🎬 Has video: {candidate.get('has_video', False)}"
)
logging.info(
f" 🖼️ Has photo: {candidate.get('has_photo', False)}"
)
logging.info(
f" 🔁 Is retweet: {getattr(tweet, 'is_retweet', False)}"
)
remember_posted_tweet(
state,
candidate,
bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}",
)
save_state(state, STATE_PATH)
new_posts += 1
continue
# FIX #5 — fetch link metadata once here so we can pass the
# OG title to build_dynamic_alt AND reuse it inside
# build_external_link_embed, avoiding a duplicate HTTP request
# for the same URL.
link_meta_for_alt: dict = {}
if candidate.get("resolved_primary_external_url"):
try:
link_meta_for_alt = fetch_link_metadata(
candidate["resolved_primary_external_url"],
media_http_client,
)
except Exception:
pass
rich_text = make_rich(raw_text)
dynamic_alt = build_dynamic_alt(
full_clean_text,
link_title=link_meta_for_alt.get("title"),
)
image_embeds = []
video_embed = None
external_embed = None
media_upload_failures = []
has_video = candidate.get("has_video", False)
if has_video:
video_media = next(
(
m
for m in (tweet.media or [])
if getattr(m, "type", None) == "video"
),
None,
)
if video_media:
if not tweet.tweet_url:
logging.warning(
"⚠️ Tweet has video marker but no tweet URL. "
"Skipping video."
)
media_upload_failures.append("video:no_tweet_url")
else:
temp_video_base = make_unique_video_temp_base(
tweet.tweet_url
)
temp_video_path = f"{temp_video_base}.mp4"
try:
real_video_url = (
extract_video_url_from_tweet_page(
browser_context, tweet.tweet_url
)
)
if not real_video_url:
logging.warning(
f"⚠️ Could not resolve playable video URL "
f"for {tweet.tweet_url}"
)
media_upload_failures.append(
f"video:resolve_failed:{tweet.tweet_url}"
)
else:
cropped_video_path = (
download_and_crop_video(
real_video_url, temp_video_path
)
)
if not cropped_video_path:
logging.warning(
f"⚠️ Video download/crop failed for "
f"{tweet.tweet_url}"
)
media_upload_failures.append(
f"video:crop_failed:{tweet.tweet_url}"
)
else:
video_blob = get_blob_from_file(
cropped_video_path, bsky_client
)
if not video_blob:
logging.warning(
f"⚠️ Video upload blob failed for "
f"{tweet.tweet_url}"
)
media_upload_failures.append(
f"video:upload_failed:{tweet.tweet_url}"
)
else:
video_embed = build_video_embed(
video_blob, dynamic_alt
)
if not video_embed:
media_upload_failures.append(
f"video:embed_failed:{tweet.tweet_url}"
)
finally:
remove_file_quietly(temp_video_path)
remove_file_quietly(
f"{temp_video_base}_source.mp4"
)
remove_file_quietly(
f"{temp_video_base}_trimmed.mp4"
)
remove_file_quietly(
f"{temp_video_base}_compressed.mp4"
)
if not video_embed:
logging.warning(
"⚠️ Tweet contains video, but video could not be "
"posted. Skipping photo fallback for this tweet."
)
else:
if tweet.media:
for media in tweet.media:
if media.type == "photo":
blob = get_blob_from_url(
media.media_url_https,
bsky_client,
media_http_client,
)
if blob:
image_embeds.append(
models.AppBskyEmbedImages.Image(
alt=dynamic_alt,
image=blob,
)
)
else:
media_upload_failures.append(
f"photo:{media.media_url_https}"
)
# --- External link card logic ---
if not video_embed and not image_embeds:
candidate_url = candidate.get(
"resolved_primary_external_url"
)
if candidate_url:
if candidate.get("looks_like_title_plus_url"):
logging.info(
f"🔗 Detected title+URL post style. "
f"Using resolved URL for external card: "
f"{candidate_url}"
)
else:
logging.info(
f"🔗 Using resolved first external URL for "
f"external card: {candidate_url}"
)
# FIX #5 — pass the already-fetched metadata so
# build_external_link_embed skips a duplicate HTTP fetch.
external_embed = build_external_link_embed(
candidate_url,
bsky_client,
media_http_client,
fallback_title="Link",
prefetched_metadata=link_meta_for_alt or None,
)
if external_embed:
logging.info(
f"✅ Built external link card for URL: "
f"{candidate_url}"
)
else:
logging.info(
f" Could not build external link card metadata "
f"for URL: {candidate_url}"
)
try:
post_result = None
post_mode = "text"
if video_embed:
post_result = send_post_with_retry(
bsky_client,
text=rich_text,
embed=video_embed,
langs=bsky_langs,
)
post_mode = "video"
elif image_embeds:
embed = models.AppBskyEmbedImages.Main(
images=image_embeds
)
post_result = send_post_with_retry(
bsky_client,
text=rich_text,
embed=embed,
langs=bsky_langs,
)
post_mode = f"images:{len(image_embeds)}"
elif external_embed:
post_result = send_post_with_retry(
bsky_client,
text=rich_text,
embed=external_embed,
langs=bsky_langs,
)
post_mode = "external_link_card"
else:
post_result = send_post_with_retry(
bsky_client,
text=rich_text,
langs=bsky_langs,
)
post_mode = "text_only"
bsky_uri = getattr(post_result, "uri", None)
remember_posted_tweet(
state, candidate, bsky_uri=bsky_uri
)
state = prune_state(state, max_entries=5000)
save_state(state, STATE_PATH)
recent_bsky_posts.insert(
0,
{
"uri": bsky_uri,
"text": raw_text,
"normalized_text": candidate["normalized_text"],
"canonical_non_x_urls": candidate[
"canonical_non_x_urls"
],
"media_fingerprint": candidate["media_fingerprint"],
"text_media_key": candidate["text_media_key"],
"created_at": arrow.utcnow().isoformat(),
},
)
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
new_posts += 1
if media_upload_failures:
logging.warning(
f"✅ Posted tweet to Bluesky with degraded media "
f"mode ({post_mode}). "
f"Failed media items: {media_upload_failures}"
)
else:
logging.info(
f"✅ Posted new tweet to Bluesky with mode "
f"{post_mode}: {raw_text}"
)
time.sleep(5)
except Exception as e:
logging.error(
f"❌ Failed to post tweet to Bluesky: {e}"
)
browser.close()
logging.info(
f"✅ Sync complete. Posted {new_posts} new updates."
)
except Exception as e:
logging.error(f"❌ Error during sync cycle: {e}")
def main():
load_dotenv()
parser = argparse.ArgumentParser(
description="Twitter to Bluesky Sync"
)
parser.add_argument(
"--twitter-username",
help="Your Twitter login username",
)
parser.add_argument(
"--twitter-password",
help="Your Twitter login password",
# FIX #15 — password args are still supported for compatibility but
# the .env file is the recommended path; passwords passed via CLI
# are visible in `ps aux`. Consider removing these args and requiring
# env vars exclusively, or prompting with getpass for interactive use.
)
parser.add_argument(
"--twitter-email",
help="Your Twitter email for security challenges",
)
parser.add_argument(
"--twitter-handle",
help="The Twitter account to scrape",
)
parser.add_argument(
"--bsky-handle",
help="Your Bluesky handle",
)
parser.add_argument(
"--bsky-password",
help="Your Bluesky app password",
# FIX #15 — same note as --twitter-password above.
)
parser.add_argument(
"--bsky-base-url",
help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social",
)
parser.add_argument(
"--bsky-langs",
help="Comma-separated language codes for Bluesky posts (default: ca)",
default=None,
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help=(
"Simulate sync without posting to Bluesky. "
"Logs what would be posted."
),
)
args = parser.parse_args()
# Resolve credentials: CLI args take priority, then env vars.
# FIX #15 — document that env vars are the secure path; CLI args expose
# secrets in the process list. Operators should prefer .env / env vars.
args.twitter_username = args.twitter_username or os.getenv(
"TWITTER_USERNAME"
)
args.twitter_password = args.twitter_password or os.getenv(
"TWITTER_PASSWORD"
)
args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL")
args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE")
args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD")
args.twitter_handle = (
args.twitter_handle
or os.getenv("TWITTER_HANDLE")
or args.twitter_username
)
args.bsky_base_url = (
args.bsky_base_url
if args.bsky_base_url
else DEFAULT_BSKY_BASE_URL
)
# --- Language handling: CLI > env > default (Catalan) ---
raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS")
if raw_langs:
args.bsky_langs = [
lang.strip()
for lang in raw_langs.split(",")
if lang.strip()
]
logging.info(
f"🌍 Using configured Bluesky languages: {args.bsky_langs}"
)
else:
args.bsky_langs = DEFAULT_BSKY_LANGS
logging.info(
f"🌍 Using default Bluesky languages: {args.bsky_langs}"
)
missing_args = []
if not args.twitter_username:
missing_args.append("--twitter-username / TWITTER_USERNAME")
if not args.twitter_password:
missing_args.append("--twitter-password / TWITTER_PASSWORD")
if not args.bsky_handle:
missing_args.append("--bsky-handle / BSKY_HANDLE")
if not args.bsky_password:
missing_args.append("--bsky-password / BSKY_APP_PASSWORD")
if missing_args:
logging.error(
f"❌ Missing credentials! You forgot to provide: "
f"{', '.join(missing_args)}"
)
return
logging.info(f"🤖 Bot started. Will check @{args.twitter_handle}")
logging.info(
f"🌍 Posting destination base URL: {args.bsky_base_url}"
)
if args.dry_run:
logging.info(
"🧪 DRY RUN MODE ENABLED — no posts will be created."
)
reset_caches()
sync_feeds(args)
logging.info("🤖 Bot finished.")
if __name__ == "__main__":
main()