diff --git a/twitter2bsky_daemon.py b/twitter2bsky_daemon.py index ab7f251..3f2fd1a 100644 --- a/twitter2bsky_daemon.py +++ b/twitter2bsky_daemon.py @@ -1,136 +1,1379 @@ import argparse import arrow -import fastfeedparser +import hashlib +import html +import io +import json import logging import re import httpx import time -import charset_normalizer -import sys import os -import io -import json -import hashlib -import html -from dataclasses import dataclass -from typing import Optional, List, Set, Dict, Any, Tuple -from urllib.parse import urlparse, urlunparse +import subprocess +import uuid +from urllib.parse import urlparse +from dotenv import load_dotenv from atproto import Client, client_utils, models +from playwright.sync_api import sync_playwright +from moviepy import VideoFileClip from bs4 import BeautifulSoup +from PIL import Image -try: - from PIL import Image - PIL_AVAILABLE = True -except ImportError: - Image = None - PIL_AVAILABLE = False +# --- Configuration --- +LOG_PATH = "twitter2bsky.log" +STATE_PATH = "twitter2bsky_state.json" +SCRAPE_TWEET_LIMIT = 30 +DEDUPE_BSKY_LIMIT = 30 +TWEET_MAX_AGE_DAYS = 3 +BSKY_TEXT_MAX_LENGTH = 275 +DEFAULT_BSKY_LANGS = ["ca"] + +VIDEO_MAX_DURATION_SECONDS = 179 +MAX_VIDEO_UPLOAD_SIZE_MB = 45 + +BSKY_IMAGE_MAX_BYTES = 950 * 1024 +BSKY_IMAGE_MAX_DIMENSION = 2000 +BSKY_IMAGE_MIN_JPEG_QUALITY = 45 + +EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 +EXTERNAL_THUMB_MAX_DIMENSION = 1200 +EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 + +BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 +BSKY_BLOB_UPLOAD_BASE_DELAY = 10 +BSKY_BLOB_UPLOAD_MAX_DELAY = 300 +BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 +BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 + +BSKY_SEND_POST_MAX_RETRIES = 3 +BSKY_SEND_POST_BASE_DELAY = 5 +BSKY_SEND_POST_MAX_DELAY = 60 + +MEDIA_DOWNLOAD_TIMEOUT = 30 +LINK_METADATA_TIMEOUT = 10 +URL_RESOLVE_TIMEOUT = 12 +PLAYWRIGHT_RESOLVE_TIMEOUT_MS = 30000 +SUBPROCESS_TIMEOUT_SECONDS = 180 +FFPROBE_TIMEOUT_SECONDS = 15 +DEFAULT_BSKY_BASE_URL = "https://bsky.social" + +OG_TITLE_WAIT_TIMEOUT_MS = 7000 +PLAYWRIGHT_POST_GOTO_SLEEP_S = 2.0 +PLAYWRIGHT_IDLE_POLL_SLEEP_S = 0.8 +PLAYWRIGHT_IDLE_POLL_ROUNDS = 4 +PLAYWRIGHT_RETRY_SLEEP_S = 2.0 +VIDEO_PLAYER_WAIT_ROUNDS = 8 +VIDEO_PLAYER_RETRY_ROUNDS = 5 +URL_TAIL_MIN_PREFIX_CHARS = 35 +URL_TAIL_MAX_LOOKBACK_CHARS = 120 +URL_TAIL_MAX_CLAUSE_DISTANCE = 180 +DYNAMIC_ALT_MAX_LENGTH = 150 +TRUNCATE_MIN_PREFIX_CHARS = 20 +SHORT_TWEET_OG_FETCH_THRESHOLD = 35 +ORPHAN_DIGIT_MAX_DIGITS = 3 +SESSION_FILE_PERMISSIONS = 0o600 + +# --- Logging Setup --- +logging.basicConfig( + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.FileHandler(LOG_PATH, encoding="utf-8"), + logging.StreamHandler(), + ], + level=logging.INFO, +) + +# --- Per-run caches --- +class _RunCache: + def __init__(self): + self.og_title: dict = {} + self.url_resolution: dict = {} + self.url_validity: dict = {} + + def clear(self): + self.og_title.clear() + self.url_resolution.clear() + self.url_validity.clear() + +_cache = _RunCache() -# ============================================================ -# Config -# ============================================================ -DEFAULT_STATE_PATH = "rss2bsky_state.json" -DEFAULT_COOLDOWN_STATE_PATH = "rss2bsky_cooldowns.json" +def reset_caches(): + _cache.clear() -@dataclass(frozen=True) -class LimitsConfig: - dedupe_bsky_limit: int = 30 - bsky_text_max_length: int = 275 - - external_thumb_max_bytes: int = 750 * 1024 - external_thumb_target_bytes: int = 500 * 1024 - external_thumb_max_dimension: int = 1000 - external_thumb_min_jpeg_quality: int = 35 - - state_max_entries: int = 5000 +# --- Custom Classes --- +class ScrapedMedia: + def __init__(self, url, media_type="photo"): + self.type = media_type + self.media_url_https = url -@dataclass(frozen=True) -class RetryConfig: - blob_upload_max_retries: int = 3 - blob_upload_base_delay: int = 8 - blob_upload_max_delay: int = 120 - blob_transient_error_retries: int = 2 - blob_transient_error_delay: int = 10 - post_retry_delay_seconds: int = 2 +class ScrapedTweet: + def __init__(self, created_on, text, media_urls, tweet_url=None, card_url=None, is_retweet=False): + self.created_on = created_on + self.text = text + self.tweet_url = tweet_url + self.card_url = card_url + self.is_retweet = is_retweet + self.media = [ScrapedMedia(url, media_type) for url, media_type in media_urls] -@dataclass(frozen=True) -class CooldownConfig: - default_post_cooldown_seconds: int = 3600 - default_thumb_cooldown_seconds: int = 1800 +# --- Helpers --- +def take_error_screenshot(page, error_msg): + logging.info(f"πŸ“Έ Taking screenshot... Shot: {error_msg}") + timestamp = time.strftime("%Y%m%d_%H%M%S") + screenshot_name = f"screenshot_{timestamp}.png" + page.screenshot(path=screenshot_name) + logging.info(f"πŸ“Έ Screenshot saved as: {screenshot_name}") -@dataclass(frozen=True) -class NetworkConfig: - http_timeout: int = 20 +def is_valid_url(url): + if url in _cache.url_validity: + return _cache.url_validity[url] + + try: + response = httpx.head(url, timeout=5, follow_redirects=True) + result = response.status_code < 500 + except Exception: + result = False + + _cache.url_validity[url] = result + return result -@dataclass(frozen=True) -class AppConfig: - limits: LimitsConfig = LimitsConfig() - retry: RetryConfig = RetryConfig() - cooldown: CooldownConfig = CooldownConfig() - network: NetworkConfig = NetworkConfig() +def strip_trailing_url_punctuation(url): + if not url: + return url + # Strip a trailing hashtag-style fragment (#Word) that is really a social + # hashtag glued to the end of a URL with no space, e.g. + # https://cit.transit.gencat.cat#SCT β†’ https://cit.transit.gencat.cat + # Only stripped when it starts with a letter so real anchors like + # /page#section-2 inside a longer sentence are left alone. + url = re.sub(r"#[A-Za-z]\w*$", "", url.strip()) + return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url) -# ============================================================ -# Local models -# ============================================================ -@dataclass -class EntryCandidate: - item: Any - title_text: str - normalized_title: str - canonical_link: Optional[str] - published_at: Optional[str] - published_arrow: Any - entry_fingerprint: str - post_text_variants: List[str] +def split_url_hashtag_suffix(text): + """ + Split a URL that has a hashtag fragment glued to it with no space, e.g.: + 'https://cit.transit.gencat.cat#SCT' + becomes: + 'https://cit.transit.gencat.cat #SCT' + + Only splits when the fragment looks like a social hashtag: starts with # + followed by a letter then word characters. The lookahead (?=\\s|$) ensures + we only act at a word boundary so mid-sentence anchors followed by more + URL path are left untouched. + """ + if not text: + return text + + fixed = re.sub( + r"(https?://[^\s#<>\"']+)(#[A-Za-z]\w*)(?=\s|$)", + r"\1 \2", + text, + ) + if fixed != text: + logging.info("πŸ”§ Split hashtag suffix from URL in text") + return fixed -@dataclass -class RecentBskyPost: - uri: Optional[str] - text: str - normalized_text: str - canonical_non_x_urls: Set[str] - created_at: Optional[str] +def split_concatenated_urls(text): + if not text: + return text + + fixed = re.sub(r"(https?://[^\s]+?)(https?://)", r"\1 \2", text) + if fixed != text: + logging.info("πŸ”§ Split concatenated URLs in text") + return fixed -@dataclass -class RunResult: - published_count: int - stopped_reason: Optional[str] = None +def repair_broken_urls(text): + if not text: + return text + + original = text + text = split_concatenated_urls(text) + # Split glued hashtag suffixes before any rejoining passes + text = split_url_hashtag_suffix(text) + + text = re.sub(r"(https?://)\s*[\r\n]+\s*", r"\1", text, flags=re.IGNORECASE) + + prev_text = None + while prev_text != text: + prev_text = text + text = re.sub( + r"((?:https?://|www\.)[^\s<>\"]*)[\r\n]+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", + r"\1\2", + text, + flags=re.IGNORECASE, + ) + + text = re.sub( + r"((?:https?://|www\.)[^\s<>\"]*)\s+([A-Za-z0-9/\-._~%!$&'()*+,;=:@?#]+)", + r"\1\2", + text, + flags=re.IGNORECASE, + ) + + text = split_concatenated_urls(text) + # Run hashtag split again after rejoining passes β€” the rejoining regex + # contains # in its character class so it can re-glue a fragment. + text = split_url_hashtag_suffix(text) + + if text != original: + logging.info("πŸ”§ Repaired broken URL wrapping in scraped text") + + return text -# ============================================================ -# Logging -# ============================================================ -def setup_logging() -> None: - logging.basicConfig( - format="%(asctime)s %(message)s", - level=logging.INFO, - stream=sys.stdout +def repair_broken_mentions(text): + if not text: + return text + + lines = text.splitlines() + result = [] + i = 0 + changed = False + + def is_mention_only_line(s): + return bool(re.fullmatch(r"@[A-Za-z0-9_]+", s.strip())) + + def is_blank_line(s): + return not s.strip() + + while i < len(lines): + current = lines[i] + stripped = current.strip() + + if is_blank_line(current): + result.append("") + i += 1 + continue + + if is_mention_only_line(current): + if result and result[-1].strip(): + result[-1] = result[-1].rstrip() + " " + stripped + changed = True + else: + result.append(stripped) + + i += 1 + + while i < len(lines): + next_line = lines[i] + next_stripped = next_line.strip() + + if is_blank_line(next_line): + break + if is_mention_only_line(next_line): + break + + result[-1] = result[-1].rstrip() + " " + next_stripped + changed = True + i += 1 + + if i < len(lines) and is_blank_line(lines[i]): + break + + continue + + if i + 1 < len(lines) and is_mention_only_line(lines[i + 1]): + merged = stripped + " " + lines[i + 1].strip() + changed = True + i += 2 + + while i < len(lines): + next_line = lines[i] + next_stripped = next_line.strip() + + if is_blank_line(next_line): + break + if is_mention_only_line(next_line): + break + + merged = merged.rstrip() + " " + next_stripped + changed = True + i += 1 + + if i < len(lines) and is_blank_line(lines[i]): + break + + result.append(merged) + continue + + result.append(stripped) + i += 1 + + new_text = "\n".join(result) + + if changed: + logging.info("πŸ”§ Repaired broken mention wrapping in scraped text") + + return new_text + + +def strip_line_edge_whitespace(text): + if not text: + return text + + lines = text.splitlines() + cleaned_lines = [] + changed = False + + for line in lines: + cleaned = line.strip() + if cleaned != line: + changed = True + cleaned_lines.append(cleaned) + + new_text = "\n".join(cleaned_lines) + + if changed: + logging.info("πŸ”§ Stripped leading/trailing whitespace from scraped text lines") + + return new_text + + +def remove_trailing_ellipsis_line(text): + if not text: + return text + + lines = text.splitlines() + + while lines and lines[-1].strip() in {"...", "…"}: + lines.pop() + + return "\n".join(lines).strip() + + +def remove_orphaned_digit_lines_before_hashtags(text): + if not text: + return text + + lines = text.splitlines() + if len(lines) < 2: + return text + + result = [] + changed = False + i = 0 + orphan_pattern = re.compile(rf"\d{{1,{ORPHAN_DIGIT_MAX_DIGITS}}}") + + while i < len(lines): + stripped = lines[i].strip() + + if ( + stripped + and orphan_pattern.fullmatch(stripped) + and i + 1 < len(lines) + and lines[i + 1].strip().startswith("#") + ): + logging.info(f"πŸ”§ Removing orphaned digit line '{stripped}' before hashtag line") + changed = True + i += 1 + continue + + result.append(lines[i]) + i += 1 + + if changed: + return "\n".join(result) + + return text + + +def clean_post_text(text): + raw_text = (text or "").strip() + raw_text = repair_broken_urls(raw_text) + raw_text = repair_broken_mentions(raw_text) + raw_text = strip_line_edge_whitespace(raw_text) + raw_text = remove_trailing_ellipsis_line(raw_text) + raw_text = remove_orphaned_digit_lines_before_hashtags(raw_text) + return raw_text.strip() + + +def clean_url(url): + trimmed_url = url.strip() + cleaned_url = re.sub(r"\s+", "", trimmed_url) + cleaned_url = strip_trailing_url_punctuation(cleaned_url) + + if is_valid_url(cleaned_url): + return cleaned_url + return None + + +def canonicalize_url(url): + if not url: + return None + return strip_trailing_url_punctuation(url.strip()) + + +def normalize_urlish_token(token): + if not token: + return None + + token = strip_trailing_url_punctuation(token.strip()) + if not token: + return None + + if token.startswith(("http://", "https://")): + return token + + if token.startswith("www."): + return f"https://{token}" + + return None + + +def canonicalize_tweet_url(url): + if not url: + return None + + url = url.strip() + match = re.search( + r"https?://(?:www\.)?(?:x\.com|twitter\.com)/([^/]+)/status/(\d+)", + url, + re.IGNORECASE, + ) + if not match: + return url.lower() + + handle = match.group(1).lower() + tweet_id = match.group(2) + return f"https://x.com/{handle}/status/{tweet_id}" + + +def extract_tweet_id(tweet_url): + if not tweet_url: + return None + match = re.search(r"/status/(\d+)", tweet_url) + if match: + return match.group(1) + return None + + +def make_unique_video_temp_base(tweet_url=None): + tweet_id = extract_tweet_id(tweet_url) or "unknown" + ts_ms = int(time.time() * 1000) + rand = uuid.uuid4().hex[:8] + base = f"temp_video_{tweet_id}_{ts_ms}_{rand}" + logging.info(f"🎞️ Using unique temp video base: {base}") + return base + + +def remove_file_quietly(path): + if path and os.path.exists(path): + try: + os.remove(path) + logging.info(f"🧹 Removed temp file: {path}") + except Exception as e: + logging.warning(f"⚠️ Could not remove temp file {path}: {e}") + + +def is_x_or_twitter_domain(url): + try: + normalized = normalize_urlish_token(url) or url + hostname = (urlparse(normalized).hostname or "").lower() + return hostname in { + "x.com", + "www.x.com", + "twitter.com", + "www.twitter.com", + "mobile.twitter.com", + } + except Exception: + return False + + +def is_tco_domain(url): + try: + normalized = normalize_urlish_token(url) or url + hostname = (urlparse(normalized).hostname or "").lower() + return hostname == "t.co" + except Exception: + return False + + +def is_external_non_x_url(url): + if not url: + return False + return (not is_tco_domain(url)) and (not is_x_or_twitter_domain(url)) + + +def extract_urls_from_text(text): + if not text: + return [] + + repaired = repair_broken_urls(text) + pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' + return re.findall(pattern, repaired) + + +def extract_quoted_text_from_og_title(og_title): + if not og_title: + return None + + decoded = html.unescape(og_title).strip() + match = re.search(r'on X:\s*"(?P.*)"\s*/\s*X\s*$', decoded, flags=re.DOTALL) + if match: + extracted = match.group("text").strip() + if extracted: + return extracted + + first_quote = decoded.find('"') + last_quote = decoded.rfind('"') + if 0 <= first_quote < last_quote: + extracted = decoded[first_quote + 1 : last_quote].strip() + if extracted: + return extracted + + return None + + +def should_fetch_og_title(tweet): + text = clean_post_text(tweet.text or "") + urls = extract_urls_from_text(text) + + if not text: + return True + + if any(is_tco_domain(normalize_urlish_token(u) or u) for u in urls): + return True + + if "…" in text or text.endswith("..."): + return True + + if len(text) < SHORT_TWEET_OG_FETCH_THRESHOLD: + return True + + return False + + +def fetch_tweet_og_title_text(tweet_url): + if not tweet_url: + return None + + if tweet_url in _cache.og_title: + logging.info(f"⚑ Using cached og:title text for {tweet_url}") + return _cache.og_title[tweet_url] + + browser = None + browser_context = None + page = None + + try: + logging.info(f"🧾 Fetching og:title from tweet page: {tweet_url}") + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + browser_context = browser.new_context( + user_agent=( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + viewport={"width": 1280, "height": 900}, + ) + page = browser_context.new_page() + page.goto( + tweet_url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, + ) + + try: + page.wait_for_selector( + 'meta[property="og:title"]', + timeout=OG_TITLE_WAIT_TIMEOUT_MS, + ) + except Exception: + pass + + og_title = ( + page.locator('meta[property="og:title"]') + .first.get_attribute("content") + ) + extracted = extract_quoted_text_from_og_title(og_title) + + if extracted: + extracted = clean_post_text(extracted) + _cache.og_title[tweet_url] = extracted + logging.info(f"βœ… Extracted tweet text from og:title for {tweet_url}") + return extracted + + logging.info(f"ℹ️ No usable og:title text extracted for {tweet_url}") + _cache.og_title[tweet_url] = None + return None + + except Exception as e: + logging.warning( + f"⚠️ Could not extract og:title text from {tweet_url}: {repr(e)}" + ) + try: + if page: + take_error_screenshot(page, "tweet_og_title_failed") + except Exception: + pass + _cache.og_title[tweet_url] = None + return None + finally: + try: + if page: + page.close() + except Exception: + pass + try: + if browser_context: + browser_context.close() + except Exception: + pass + try: + if browser: + browser.close() + except Exception: + pass + + +def resolve_tco_with_httpx(url, http_client): + try: + response = http_client.get( + url, timeout=URL_RESOLVE_TIMEOUT, follow_redirects=True + ) + final_url = canonicalize_url(str(response.url)) + if final_url: + logging.info(f"πŸ”— Resolved t.co with httpx: {url} -> {final_url}") + return final_url + except Exception as e: + logging.warning(f"⚠️ httpx t.co resolution failed for {url}: {repr(e)}") + + return canonicalize_url(url) + + +def resolve_tco_with_playwright(url): + browser = None + browser_context = None + page = None + + try: + logging.info(f"🌐 Resolving t.co with Playwright: {url}") + + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + browser_context = browser.new_context( + user_agent=( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + viewport={"width": 1280, "height": 900}, + ) + page = browser_context.new_page() + + try: + page.goto( + url, + wait_until="domcontentloaded", + timeout=PLAYWRIGHT_RESOLVE_TIMEOUT_MS, + ) + except Exception as e: + logging.warning( + f"⚠️ Initial Playwright goto failed for {url}: {repr(e)}" + ) + + time.sleep(PLAYWRIGHT_POST_GOTO_SLEEP_S) + final_url = canonicalize_url(page.url) + + for _ in range(PLAYWRIGHT_IDLE_POLL_ROUNDS): + if final_url and is_external_non_x_url(final_url): + break + + try: + page.wait_for_load_state("networkidle", timeout=2000) + except Exception: + pass + + time.sleep(PLAYWRIGHT_IDLE_POLL_SLEEP_S) + final_url = canonicalize_url(page.url) + + logging.info(f"🌐 Playwright final URL for {url}: {final_url}") + return final_url + + except Exception as e: + logging.warning( + f"⚠️ Playwright t.co resolution failed for {url}: {repr(e)}" + ) + try: + if page: + take_error_screenshot(page, "tco_resolve_failed") + except Exception: + pass + finally: + try: + if page: + page.close() + except Exception: + pass + try: + if browser_context: + browser_context.close() + except Exception: + pass + try: + if browser: + browser.close() + except Exception: + pass + + return canonicalize_url(url) + + +def resolve_url_if_needed(url, http_client, allow_playwright_fallback=True): + if not url: + return None + + normalized = normalize_urlish_token(url) or url + cleaned = canonicalize_url(normalized) + if not cleaned: + return None + + if cleaned in _cache.url_resolution: + logging.info( + f"⚑ Using cached URL resolution: {cleaned} -> {_cache.url_resolution[cleaned]}" + ) + return _cache.url_resolution[cleaned] + + if not is_tco_domain(cleaned): + _cache.url_resolution[cleaned] = cleaned + return cleaned + + resolved_http = resolve_tco_with_httpx(cleaned, http_client) + if is_external_non_x_url(resolved_http): + _cache.url_resolution[cleaned] = resolved_http + return resolved_http + + if not allow_playwright_fallback: + _cache.url_resolution[cleaned] = resolved_http + return resolved_http + + resolved_browser = resolve_tco_with_playwright(cleaned) + if is_external_non_x_url(resolved_browser): + logging.info( + f"βœ… Resolved t.co via Playwright to external URL: {resolved_browser}" + ) + _cache.url_resolution[cleaned] = resolved_browser + return resolved_browser + + if resolved_http and not is_tco_domain(resolved_http): + _cache.url_resolution[cleaned] = resolved_http + return resolved_http + + _cache.url_resolution[cleaned] = cleaned + return cleaned + + +def extract_non_x_urls_from_text(text): + urls = extract_urls_from_text(text) + result = [] + + for url in urls: + normalized = normalize_urlish_token(url) + cleaned = strip_trailing_url_punctuation(normalized or url) + if not cleaned: + continue + + if is_tco_domain(cleaned): + result.append(cleaned) + continue + + if not is_x_or_twitter_domain(cleaned): + result.append(cleaned) + + return result + + +def extract_ordered_non_x_urls(text): + seen = set() + ordered = [] + + for url in extract_non_x_urls_from_text(text): + canonical = canonicalize_url(url) + if canonical and canonical not in seen: + seen.add(canonical) + ordered.append(canonical) + + return ordered + + +def extract_first_visible_non_x_url(text): + for url in extract_non_x_urls_from_text(text or ""): + canonical = canonicalize_url(url) + if canonical: + return canonical + return None + + +def extract_first_resolved_external_url( + text, http_client, allow_playwright_fallback=True +): + for url in extract_non_x_urls_from_text(text or ""): + resolved = resolve_url_if_needed( + url, + http_client, + allow_playwright_fallback=allow_playwright_fallback, + ) + if not resolved: + continue + + if is_external_non_x_url(resolved): + logging.info(f"βœ… Selected resolved external URL for card: {resolved}") + return resolved + + return None + + +def resolve_card_url(card_url, http_client): + if not card_url: + return None + + cleaned = canonicalize_url(card_url.strip()) + if not cleaned: + return None + + if is_external_non_x_url(cleaned): + logging.info(f"πŸ”— Card URL is already external: {cleaned}") + return cleaned + + if is_tco_domain(cleaned): + resolved = resolve_url_if_needed( + cleaned, http_client, allow_playwright_fallback=True + ) + if resolved and is_external_non_x_url(resolved): + logging.info(f"πŸ”— Resolved card t.co URL: {cleaned} -> {resolved}") + return resolved + + if is_x_or_twitter_domain(cleaned): + logging.info( + f"ℹ️ Card URL resolves to X/Twitter domain, ignoring: {cleaned}" + ) + return None + + return cleaned + + +def sanitize_visible_urls_in_text(text, http_client, has_media=False): + if not text: + return text, None + + working = clean_post_text(text) + url_pattern = r'(?:(?:https?://)|(?:www\.))[^\s<>"\']+' + urls = re.findall(url_pattern, working) + + if not urls: + return working, None + + replacements = {} + first_external_resolved = None + + for raw_url in urls: + normalized = normalize_urlish_token(raw_url) + cleaned = canonicalize_url(normalized or raw_url) + if not cleaned: + continue + + if is_x_or_twitter_domain(cleaned): + replacements[raw_url] = "" + logging.info(f"🧹 Removing X/Twitter URL from visible text: {cleaned}") + continue + + final_url = cleaned + if is_tco_domain(cleaned): + resolved_http_first = resolve_tco_with_httpx(cleaned, http_client) + + if is_external_non_x_url(resolved_http_first): + final_url = resolved_http_first + _cache.url_resolution[cleaned] = final_url + else: + if ( + has_media + and resolved_http_first + and is_x_or_twitter_domain(resolved_http_first) + ): + final_url = resolved_http_first + _cache.url_resolution[cleaned] = final_url + logging.info( + f"⚑ Skipping Playwright t.co fallback because tweet has media " + f"and httpx already resolved to X/Twitter URL: {final_url}" + ) + else: + final_url = resolve_url_if_needed( + cleaned, http_client, allow_playwright_fallback=True + ) + + if is_x_or_twitter_domain(final_url): + replacements[raw_url] = "" + logging.info( + f"🧹 Removing resolved X/Twitter URL from visible text: {final_url}" + ) + continue + + if normalized and normalized.startswith("https://www."): + final_url = normalized + elif normalized and normalized.startswith("http://www."): + final_url = normalized + + if is_external_non_x_url(final_url) and not first_external_resolved: + first_external_resolved = final_url + + replacements[raw_url] = final_url + + def replace_match(match): + raw = match.group(0) + return replacements.get(raw, raw) + + working = re.sub(url_pattern, replace_match, working) + + deduped_lines = [] + for line in working.splitlines(): + line_urls = re.findall(url_pattern, line) + if len(line_urls) > 1: + prefix = re.sub(url_pattern, "", line).strip() + kept_urls = [] + seen_in_line: set = set() + + for url in line_urls: + normalized = normalize_urlish_token(url) or url + canonical = canonicalize_url(normalized) + + if not canonical: + continue + if is_x_or_twitter_domain(canonical): + continue + if canonical in seen_in_line: + continue + + seen_in_line.add(canonical) + kept_urls.append(url) + + if prefix and kept_urls: + rebuilt = prefix + " " + " ".join(kept_urls) + elif prefix: + rebuilt = prefix + else: + rebuilt = " ".join(kept_urls) + + deduped_lines.append(rebuilt.strip()) + else: + cleaned_line = re.sub(r"\s{2,}", " ", line).strip() + deduped_lines.append(cleaned_line) + + working = "\n".join(deduped_lines) + working = re.sub(r"[ \t]+", " ", working) + working = re.sub(r"\n{3,}", "\n\n", working).strip() + + return working, first_external_resolved + + +def build_effective_tweet_text(tweet, http_client): + scraped_text = clean_post_text(tweet.text or "") + has_media = bool(tweet.media) + og_title_text = None + + if should_fetch_og_title(tweet): + og_title_text = fetch_tweet_og_title_text(tweet.tweet_url) + + candidate_text = scraped_text + if og_title_text: + scraped_urls = extract_urls_from_text(scraped_text) + og_urls = extract_urls_from_text(og_title_text) + + if len(og_title_text) >= len(scraped_text) or (og_urls and not scraped_urls): + candidate_text = og_title_text + logging.info("🧾 Using og:title-derived tweet text as primary content") + + candidate_text, resolved_primary_external_url = sanitize_visible_urls_in_text( + candidate_text, http_client, has_media=has_media, + ) + candidate_text = clean_post_text(candidate_text) + + resolved_card_url = resolve_card_url( + getattr(tweet, "card_url", None), http_client + ) + + if resolved_card_url and is_external_non_x_url(resolved_card_url): + if not resolved_primary_external_url: + resolved_primary_external_url = resolved_card_url + logging.info( + f"πŸ”— Using resolved card URL as primary external URL: {resolved_card_url}" + ) + elif resolved_primary_external_url != resolved_card_url: + logging.info( + f"ℹ️ Card URL ({resolved_card_url}) differs from text URL " + f"({resolved_primary_external_url}). Preferring card URL for external embed." + ) + resolved_primary_external_url = resolved_card_url + + if not resolved_primary_external_url: + resolved_primary_external_url = extract_first_resolved_external_url( + candidate_text, + http_client, + allow_playwright_fallback=not has_media, + ) + + return candidate_text, resolved_primary_external_url + + +def remove_url_from_visible_text(text, url_to_remove): + if not text or not url_to_remove: + return text + + canonical_target = canonicalize_url(url_to_remove) + lines = text.splitlines() + cleaned_lines = [] + + for line in lines: + line_urls = extract_urls_from_text(line) + new_line = line + + for url in line_urls: + normalized = normalize_urlish_token(url) or url + cleaned_candidate = canonicalize_url( + strip_trailing_url_punctuation(normalized) + ) + if cleaned_candidate == canonical_target: + pattern = re.escape(url) + new_line = re.sub(pattern, "", new_line) + + new_line = re.sub(r"[ \t]+", " ", new_line).strip() + cleaned_lines.append(new_line) + + result = "\n".join(cleaned_lines) + result = re.sub(r"[ \t]+", " ", result) + result = re.sub(r"\n{3,}", "\n\n", result).strip() + + return result + + +def looks_like_title_plus_url_post(text): + if not text: + return False + + repaired = repair_broken_urls(text) + repaired = strip_line_edge_whitespace(repaired) + lines = [line.strip() for line in repaired.splitlines() if line.strip()] + if len(lines) < 2: + return False + + last_line = lines[-1] + urls_in_last_line = extract_ordered_non_x_urls(last_line) + total_urls = extract_ordered_non_x_urls(repaired) + + return ( + len(urls_in_last_line) == 1 + and len(total_urls) == 1 + and last_line.startswith(("http://", "https://", "www.")) ) -# ============================================================ -# State + cooldown -# ============================================================ -def default_state() -> Dict[str, Any]: +def looks_like_url_and_tag_tail(text, primary_non_x_url=None): + if not text or not primary_non_x_url: + return False + + repaired = repair_broken_urls(text) + idx = repaired.find(primary_non_x_url) + if idx == -1: + return False + + tail = repaired[idx:].strip() + if not tail.startswith(("http://", "https://", "www.")): + return False + + if re.search(r"(?:https?://|www\.)\S+.*#[^\s#]+", tail): + return True + + return False + + +def find_tail_preservation_start(text, primary_non_x_url): + if not text or not primary_non_x_url: + return None + + url_pos = text.find(primary_non_x_url) + if url_pos == -1: + return None + + hashtag_match = re.search(r"\s#[^\s#]+", text[url_pos:]) + has_hashtag_after_url = hashtag_match is not None + + candidates = [url_pos] + + clause_patterns = [ + r"\.\s+", r":\s+", r";\s+", r"!\s+", r"\?\s+", r",\s+", + ] + + before = text[:url_pos] + for pattern in clause_patterns: + for match in re.finditer(pattern, before): + candidates.append(match.end()) + + last_newline = before.rfind("\n") + if last_newline != -1: + candidates.append(last_newline + 1) + + if has_hashtag_after_url: + generous_start = max(0, url_pos - URL_TAIL_MAX_LOOKBACK_CHARS) + while generous_start > 0 and text[generous_start] not in {" ", "\n"}: + generous_start -= 1 + candidates.append(generous_start) + + reasonable_candidates = [ + c for c in candidates + if 0 <= c < url_pos and (url_pos - c) <= URL_TAIL_MAX_CLAUSE_DISTANCE + ] + + if reasonable_candidates: + start = min(reasonable_candidates, key=lambda c: (url_pos - c)) + if url_pos - start < URL_TAIL_MIN_PREFIX_CHARS: + farther = [ + c for c in reasonable_candidates + if url_pos - c >= URL_TAIL_MIN_PREFIX_CHARS + ] + if farther: + start = min(farther, key=lambda c: (url_pos - c)) + return start + + return url_pos + + +def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): + if len(text) <= max_length: + return text + + truncated = text[: max_length - 3] + last_space = truncated.rfind(" ") + if last_space > TRUNCATE_MIN_PREFIX_CHARS: + return truncated[:last_space] + "..." + return truncated + "..." + + +def truncate_text_preserving_tail(text, tail_start, max_length=BSKY_TEXT_MAX_LENGTH): + if ( + not text + or tail_start is None + or tail_start < 0 + or tail_start >= len(text) + ): + return truncate_text_safely(text, max_length) + + if len(text) <= max_length: + return text + + tail = text[tail_start:].strip() + if not tail: + return truncate_text_safely(text, max_length) + + reserve = len(tail) + 4 + if reserve >= max_length: + shortened_tail = tail[-(max_length - 3) :].strip() + first_space = shortened_tail.find(" ") + if 0 <= first_space <= 30: + shortened_tail = shortened_tail[first_space + 1 :].strip() + return f"...{shortened_tail}" + + available_prefix = max_length - reserve + prefix = text[:tail_start].rstrip() + + if len(prefix) > available_prefix: + prefix = prefix[:available_prefix].rstrip() + last_space = prefix.rfind(" ") + if last_space > 20: + prefix = prefix[:last_space].rstrip() + + final_text = f"{prefix}... {tail}".strip() + final_text = re.sub(r"[ \t]+", " ", final_text) + final_text = re.sub(r"\n{3,}", "\n\n", final_text).strip() + + if len(final_text) <= max_length: + return final_text + + return truncate_text_safely(text, max_length) + + +def choose_final_visible_text( + full_clean_text, primary_non_x_url=None, prefer_full_text_without_url=True +): + text = clean_post_text(full_clean_text or "") + if not text: + return text + + if len(text) <= BSKY_TEXT_MAX_LENGTH: + logging.info( + "🟒 Original cleaned tweet text fits in Bluesky. Preserving exact text." + ) + return text + + if primary_non_x_url: + tail_start = find_tail_preservation_start(text, primary_non_x_url) + + if tail_start is not None: + preserved = truncate_text_preserving_tail( + text, tail_start, BSKY_TEXT_MAX_LENGTH + ) + if preserved and len(preserved) <= BSKY_TEXT_MAX_LENGTH: + logging.info( + "πŸ”— Preserving meaningful ending block with URL/hashtags in visible Bluesky text" + ) + return preserved + + if prefer_full_text_without_url and not looks_like_url_and_tag_tail( + text, primary_non_x_url + ): + text_without_url = remove_url_from_visible_text( + text, primary_non_x_url + ).strip() + if text_without_url and len(text_without_url) <= BSKY_TEXT_MAX_LENGTH: + logging.info( + "πŸ”— Keeping full visible text by removing long external URL from body and using external card" + ) + return text_without_url + + truncated = truncate_text_safely(text, BSKY_TEXT_MAX_LENGTH) + logging.info("βœ‚οΈ Falling back to safe truncation for visible Bluesky text") + return truncated + + +def normalize_post_text(text): + if not text: + return "" + + text = clean_post_text(text) + text = text.replace("\r", "\n") + text = re.sub(r"\s+", " ", text).strip() + return text.lower() + + +def build_media_fingerprint(tweet): + if not tweet or not tweet.media: + return "no-media" + + parts = [] + + for media in tweet.media: + media_type = getattr(media, "type", "unknown") + media_url = getattr(media, "media_url_https", "") or "" + stable_value = media_url + + if media_type == "photo": + stable_value = re.sub(r"[?&]name=\w+", "", stable_value) + stable_value = re.sub(r"[?&]format=\w+", "", stable_value) + elif media_type == "video": + stable_value = canonicalize_tweet_url( + tweet.tweet_url or media_url or "" + ) + + parts.append(f"{media_type}:{stable_value}") + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def build_bsky_media_fingerprint(post_view): + try: + embed = getattr(post_view, "embed", None) + if not embed: + return "no-media" + + parts = [] + + images = getattr(embed, "images", None) + if images: + for img in images: + image_obj = getattr(img, "image", None) + ref = ( + getattr(image_obj, "ref", None) + or getattr(image_obj, "cid", None) + or str(image_obj) + ) + parts.append(f"photo:{ref}") + + video = getattr(embed, "video", None) + if video: + ref = ( + getattr(video, "ref", None) + or getattr(video, "cid", None) + or str(video) + ) + parts.append(f"video:{ref}") + + external = getattr(embed, "external", None) + if external: + uri = getattr(external, "uri", None) or str(external) + parts.append(f"external:{uri}") + + if not parts: + return "no-media" + + parts.sort() + raw = "|".join(parts) + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + except Exception as e: + logging.debug(f"Could not build Bluesky media fingerprint: {e}") + return "no-media" + + +def build_text_media_key(normalized_text, media_fingerprint): + return hashlib.sha256( + f"{normalized_text}||{media_fingerprint}".encode("utf-8") + ).hexdigest() + + +def create_bsky_client(base_url, handle, password): + normalized_base_url = (base_url or DEFAULT_BSKY_BASE_URL).strip().rstrip("/") + logging.info(f"πŸ” Connecting Bluesky client via base URL: {normalized_base_url}") + + try: + client = Client(base_url=normalized_base_url) + except TypeError: + logging.warning( + "⚠️ Your atproto Client does not accept base_url in constructor. Falling back." + ) + client = Client() + try: + if hasattr(client, "base_url"): + client.base_url = normalized_base_url + elif hasattr(client, "_base_url"): + client._base_url = normalized_base_url + except Exception as e: + logging.warning(f"⚠️ Could not apply custom base URL cleanly: {e}") + + client.login(handle, password) + return client + + +# --- State Management --- +def default_state(): return { "version": 1, - "posted_entries": {}, + "posted_tweets": {}, "posted_by_bsky_uri": {}, "updated_at": None, } -def load_state(state_path: str) -> Dict[str, Any]: +def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): - logging.info(f"🧠 No state file found at {state_path}. Starting with empty state.") + logging.info( + f"🧠 No state file found at {state_path}. Starting with empty memory." + ) return default_state() try: @@ -138,21 +1381,24 @@ def load_state(state_path: str) -> Dict[str, Any]: state = json.load(f) if not isinstance(state, dict): - logging.warning("⚠️ State file invalid. Reinitializing.") + logging.warning("⚠️ State file is invalid. Reinitializing.") return default_state() state.setdefault("version", 1) - state.setdefault("posted_entries", {}) + state.setdefault("posted_tweets", {}) state.setdefault("posted_by_bsky_uri", {}) state.setdefault("updated_at", None) + return state except Exception as e: - logging.warning(f"⚠️ Could not load state file {state_path}: {e}. Reinitializing.") + logging.warning( + f"⚠️ Could not load state file {state_path}: {e}. Reinitializing." + ) return default_state() -def save_state(state: Dict[str, Any], state_path: str) -> None: +def save_state(state, state_path=STATE_PATH): try: state["updated_at"] = arrow.utcnow().isoformat() temp_path = f"{state_path}.tmp" @@ -167,289 +1413,704 @@ def save_state(state: Dict[str, Any], state_path: str) -> None: logging.error(f"❌ Failed to save state file {state_path}: {e}") -def prune_state(state: Dict[str, Any], max_entries: int = 5000) -> Dict[str, Any]: - posted_entries = state.get("posted_entries", {}) +def remember_posted_tweet(state, candidate, bsky_uri=None): + canonical_tweet_url = candidate.get("canonical_tweet_url") + fallback_key = f"textmedia:{candidate['text_media_key']}" + state_key = canonical_tweet_url or fallback_key - if len(posted_entries) <= max_entries: + record = { + "canonical_tweet_url": canonical_tweet_url, + "normalized_text": candidate["normalized_text"], + "raw_text": candidate["raw_text"], + "full_clean_text": candidate.get("full_clean_text", candidate["raw_text"]), + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "canonical_non_x_urls": sorted(candidate["canonical_non_x_urls"]), + "ordered_non_x_urls": candidate.get("ordered_non_x_urls", []), + "resolved_primary_external_url": candidate.get("resolved_primary_external_url"), + "bsky_uri": bsky_uri, + "tweet_created_on": candidate["tweet"].created_on, + "tweet_url": candidate["tweet"].tweet_url, + "posted_at": arrow.utcnow().isoformat(), + } + + state["posted_tweets"][state_key] = record + + if bsky_uri: + state["posted_by_bsky_uri"][bsky_uri] = state_key + + +def candidate_matches_state(candidate, state): + canonical_tweet_url = candidate["canonical_tweet_url"] + text_media_key = candidate["text_media_key"] + normalized_text = candidate["normalized_text"] + + posted_tweets = state.get("posted_tweets", {}) + + if canonical_tweet_url and canonical_tweet_url in posted_tweets: + return True, "state:tweet_url" + + for _, record in posted_tweets.items(): + if record.get("text_media_key") == text_media_key: + return True, "state:text_media_fingerprint" + + for _, record in posted_tweets.items(): + if record.get("normalized_text") == normalized_text: + return True, "state:normalized_text" + + return False, None + + +def prune_state(state, max_entries=5000): + posted_tweets = state.get("posted_tweets", {}) + + if len(posted_tweets) <= max_entries: return state sortable = [] - for key, record in posted_entries.items(): + for key, record in posted_tweets.items(): posted_at = record.get("posted_at") or "" sortable.append((key, posted_at)) sortable.sort(key=lambda x: x[1], reverse=True) keep_keys = {key for key, _ in sortable[:max_entries]} - state["posted_entries"] = {k: v for k, v in posted_entries.items() if k in keep_keys} - state["posted_by_bsky_uri"] = { - uri: key for uri, key in state.get("posted_by_bsky_uri", {}).items() if key in keep_keys + new_posted_tweets = { + key: record + for key, record in posted_tweets.items() + if key in keep_keys } + new_posted_by_bsky_uri = { + bsky_uri: key + for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items() + if key in keep_keys + } + + state["posted_tweets"] = new_posted_tweets + state["posted_by_bsky_uri"] = new_posted_by_bsky_uri return state -def remember_posted_entry(state: Dict[str, Any], candidate: EntryCandidate, posted_text: str, bsky_uri: Optional[str] = None) -> None: - canonical_link = candidate.canonical_link - fallback_key = f"fp:{candidate.entry_fingerprint}" - state_key = canonical_link or fallback_key - - record = { - "canonical_link": canonical_link, - "title_text": candidate.title_text, - "normalized_title": candidate.normalized_title, - "entry_fingerprint": candidate.entry_fingerprint, - "post_text": posted_text, - "published_at": candidate.published_at, - "bsky_uri": bsky_uri, - "posted_at": arrow.utcnow().isoformat(), - } - - state["posted_entries"][state_key] = record - - if bsky_uri: - state["posted_by_bsky_uri"][bsky_uri] = state_key - - -def candidate_matches_state(candidate: EntryCandidate, state: Dict[str, Any]) -> Tuple[bool, Optional[str]]: - canonical_link = candidate.canonical_link - entry_fingerprint = candidate.entry_fingerprint - normalized_title = candidate.normalized_title - posted_entries = state.get("posted_entries", {}) - - if canonical_link and canonical_link in posted_entries: - return True, "state:canonical_link" - - for _, record in posted_entries.items(): - if record.get("entry_fingerprint") == entry_fingerprint: - return True, "state:entry_fingerprint" - - for _, record in posted_entries.items(): - if record.get("normalized_title") == normalized_title: - if not canonical_link or record.get("canonical_link") == canonical_link: - return True, "state:normalized_title" - - return False, None - - -def default_cooldown_state() -> Dict[str, Any]: - return { - "version": 1, - "post_creation_cooldown_until": 0, - "thumb_upload_cooldown_until": 0, - "updated_at": None, - } - - -def load_cooldown_state(path: str) -> Dict[str, Any]: - if not os.path.exists(path): - return default_cooldown_state() +# --- Bluesky Feed Helpers --- +def extract_urls_from_facets(record): + urls = [] try: - with open(path, "r", encoding="utf-8") as f: - state = json.load(f) - - if not isinstance(state, dict): - return default_cooldown_state() - - state.setdefault("version", 1) - state.setdefault("post_creation_cooldown_until", 0) - state.setdefault("thumb_upload_cooldown_until", 0) - state.setdefault("updated_at", None) - return state + facets = getattr(record, "facets", None) or [] + for facet in facets: + features = getattr(facet, "features", None) or [] + for feature in features: + uri = getattr(feature, "uri", None) + if uri: + urls.append(uri) except Exception as e: - logging.warning(f"⚠️ Could not load cooldown state {path}: {e}") - return default_cooldown_state() + logging.debug(f"Could not extract facet URLs: {e}") + + return urls -def save_cooldown_state(state: Dict[str, Any], path: str) -> None: +def get_recent_bsky_posts(client, handle, limit=30): + recent_posts = [] + try: - state["updated_at"] = arrow.utcnow().isoformat() - temp_path = f"{path}.tmp" - with open(temp_path, "w", encoding="utf-8") as f: - json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) - os.replace(temp_path, path) + timeline = client.get_author_feed(handle, limit=limit) + + for item in timeline.feed: + try: + if item.reason is not None: + continue + + record = item.post.record + if getattr(record, "reply", None) is not None: + continue + + text = getattr(record, "text", "") or "" + normalized_text = normalize_post_text(text) + + urls = [] + urls.extend(extract_non_x_urls_from_text(text)) + urls.extend(extract_urls_from_facets(record)) + + canonical_non_x_urls = set() + for url in urls: + if not is_tco_domain(url) and not is_x_or_twitter_domain(url): + canonical = canonicalize_url( + normalize_urlish_token(url) or url + ) + if canonical: + canonical_non_x_urls.add(canonical) + + media_fingerprint = build_bsky_media_fingerprint(item.post) + text_media_key = build_text_media_key( + normalized_text, media_fingerprint + ) + + recent_posts.append( + { + "uri": getattr(item.post, "uri", None), + "text": text, + "normalized_text": normalized_text, + "canonical_non_x_urls": canonical_non_x_urls, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "created_at": getattr(record, "created_at", None), + } + ) + + except Exception as e: + logging.debug( + f"Skipping one Bluesky feed item during dedupe fetch: {e}" + ) + except Exception as e: - logging.warning(f"⚠️ Could not save cooldown state {path}: {e}") + logging.warning( + f"⚠️ Could not fetch recent Bluesky posts for duplicate detection " + f"(live dedup disabled for this cycle): {e}" + ) + + return recent_posts -def get_global_post_cooldown_until(cooldown_path: str) -> int: - state = load_cooldown_state(cooldown_path) - return int(state.get("post_creation_cooldown_until", 0) or 0) - - -def get_global_thumb_cooldown_until(cooldown_path: str) -> int: - state = load_cooldown_state(cooldown_path) - return int(state.get("thumb_upload_cooldown_until", 0) or 0) - - -def is_global_post_cooldown_active(cooldown_path: str) -> bool: - return int(time.time()) < get_global_post_cooldown_until(cooldown_path) - - -def is_global_thumb_cooldown_active(cooldown_path: str) -> bool: - return int(time.time()) < get_global_thumb_cooldown_until(cooldown_path) - - -def set_global_post_cooldown_until(reset_ts: int, cooldown_path: str) -> int: - state = load_cooldown_state(cooldown_path) - current = int(state.get("post_creation_cooldown_until", 0) or 0) - if reset_ts > current: - state["post_creation_cooldown_until"] = int(reset_ts) - save_cooldown_state(state, cooldown_path) - return int(load_cooldown_state(cooldown_path).get("post_creation_cooldown_until", 0) or 0) - - -def set_global_thumb_cooldown_until(reset_ts: int, cooldown_path: str) -> int: - state = load_cooldown_state(cooldown_path) - current = int(state.get("thumb_upload_cooldown_until", 0) or 0) - if reset_ts > current: - state["thumb_upload_cooldown_until"] = int(reset_ts) - save_cooldown_state(state, cooldown_path) - return int(load_cooldown_state(cooldown_path).get("thumb_upload_cooldown_until", 0) or 0) - - -def format_cooldown_until(ts: int) -> str: - return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts)) - - -def check_post_cooldown_or_log(cooldown_path: str) -> bool: - if is_global_post_cooldown_active(cooldown_path): - reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) - logging.warning(f"🟑 === BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") - return True - return False - - -def check_thumb_cooldown_or_log(cooldown_path: str) -> bool: - if is_global_thumb_cooldown_active(cooldown_path): - reset_str = format_cooldown_until(get_global_thumb_cooldown_until(cooldown_path)) - logging.info(f"πŸ–ΌοΈ Skipping external thumbnail upload due to active cooldown until {reset_str}") - return True - return False - - -# ============================================================ -# Text + URL utils -# ============================================================ -def fix_encoding(text: str) -> str: +# --- Upload / Retry Helpers --- +def get_rate_limit_wait_seconds(error_obj, default_delay): try: - return text.encode("latin-1").decode("utf-8") - except (UnicodeEncodeError, UnicodeDecodeError): - return text - - -def desescapar_unicode(text: str) -> str: - try: - return html.unescape(text) + headers = getattr(error_obj, "headers", None) + if headers: + reset_value = headers.get("ratelimit-reset") or headers.get( + "RateLimit-Reset" + ) + if reset_value: + now_ts = int(time.time()) + reset_ts = int(reset_value) + wait_seconds = max(reset_ts - now_ts + 1, default_delay) + return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) except Exception: - return text + pass + + return default_delay -def is_html(text: str) -> bool: - return bool(re.search(r'<.*?>', text or "")) +def is_transient_error(error_obj): + error_text = repr(error_obj) + transient_signals = [ + "InvokeTimeoutError", + "ReadTimeout", + "WriteTimeout", + "TimeoutException", + "RemoteProtocolError", + "ConnectError", + "503", + "502", + "504", + ] + return any(signal in error_text for signal in transient_signals) -def strip_trailing_url_punctuation(url: str) -> str: - if not url: - return url - return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url.strip()) +def upload_blob_with_retry(client, binary_data, media_label="media"): + last_exception = None + transient_attempts = 0 + + for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): + try: + result = client.upload_blob(binary_data) + return result.blob + + except Exception as e: + last_exception = e + error_text = str(e) + is_rate_limited = ( + "429" in error_text or "RateLimitExceeded" in error_text + ) + + if is_rate_limited: + backoff_delay = min( + BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_BLOB_UPLOAD_MAX_DELAY, + ) + wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + + if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: + logging.warning( + f"⏳ Bluesky blob upload rate-limited for {media_label}. " + f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + else: + logging.warning( + f"❌ Exhausted blob upload retries for {media_label} " + f"after rate limiting: {repr(e)}" + ) + break + + if ( + is_transient_error(e) + and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES + ): + transient_attempts += 1 + wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts + logging.warning( + f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. " + f"Transient retry {transient_attempts}/" + f"{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + + logging.warning(f"Could not upload {media_label}: {repr(e)}") + + if hasattr(e, "response") and e.response is not None: + try: + logging.warning( + f"Upload response status: {e.response.status_code}" + ) + logging.warning(f"Upload response body: {e.response.text}") + except Exception: + pass + + return None + + logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") + return None -def canonicalize_url(url: str): - if not url: +def send_post_with_retry(client, **kwargs): + last_exception = None + + for attempt in range(1, BSKY_SEND_POST_MAX_RETRIES + 1): + try: + return client.send_post(**kwargs) + + except Exception as e: + last_exception = e + error_text = str(e) + is_rate_limited = ( + "429" in error_text or "RateLimitExceeded" in error_text + ) + + if is_rate_limited: + backoff_delay = min( + BSKY_SEND_POST_BASE_DELAY * (2 ** (attempt - 1)), + BSKY_SEND_POST_MAX_DELAY, + ) + wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + + if attempt < BSKY_SEND_POST_MAX_RETRIES: + logging.warning( + f"⏳ Bluesky send_post rate-limited. " + f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + else: + logging.error( + f"❌ Exhausted send_post retries after rate limiting: {repr(e)}" + ) + raise + + if is_transient_error(e) and attempt < BSKY_SEND_POST_MAX_RETRIES: + wait_seconds = BSKY_SEND_POST_BASE_DELAY * attempt + logging.warning( + f"⏳ Transient send_post failure: {repr(e)}. " + f"Retry {attempt}/{BSKY_SEND_POST_MAX_RETRIES} after {wait_seconds}s." + ) + time.sleep(wait_seconds) + continue + + raise + + raise last_exception + +def compress_post_image_to_limit(image_bytes, max_bytes=BSKY_IMAGE_MAX_BYTES): + try: + with Image.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + + width, height = img.size + max_dim = max(width, height) + + if max_dim > BSKY_IMAGE_MAX_DIMENSION: + scale = BSKY_IMAGE_MAX_DIMENSION / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + img = img.resize(new_size, Image.LANCZOS) + logging.info( + f"πŸ–ΌοΈ Resized post image to {new_size[0]}x{new_size[1]}" + ) + + for quality in [90, 82, 75, 68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: + out = io.BytesIO() + img.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ Post image candidate size at JPEG quality {quality}: " + f"{len(data)} bytes ({len(data) / 1024:.2f} KB)" + ) + if len(data) <= max_bytes: + return data + + for target_dim in [1800, 1600, 1400, 1200, 1000]: + resized = img.copy() + width, height = resized.size + max_dim = max(width, height) + + if max_dim > target_dim: + scale = target_dim / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + resized = resized.resize(new_size, Image.LANCZOS) + + for quality in [68, 60, 52, BSKY_IMAGE_MIN_JPEG_QUALITY]: + out = io.BytesIO() + resized.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ Post image resized to <= {target_dim}px at quality {quality}: " + f"{len(data)} bytes ({len(data) / 1024:.2f} KB)" + ) + if len(data) <= max_bytes: + return data + + except Exception as e: + logging.warning(f"Could not compress post image: {repr(e)}") + + return None + + +def get_blob_from_url(media_url, client, http_client): + try: + r = http_client.get( + media_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True + ) + if r.status_code != 200: + logging.warning( + f"Could not fetch media {media_url}: HTTP {r.status_code}" + ) + return None + + content = r.content + if not content: + logging.warning( + f"Could not fetch media {media_url}: empty response body" + ) + return None + + content_type = (r.headers.get("content-type") or "").lower() + upload_bytes = content + + if content_type.startswith("image/"): + original_size = len(content) + logging.info( + f"πŸ–ΌοΈ Downloaded post image {media_url} " + f"({original_size} bytes / {original_size / 1024:.2f} KB)" + ) + + if original_size > BSKY_IMAGE_MAX_BYTES: + logging.info( + f"πŸ–ΌοΈ Post image exceeds safe Bluesky limit " + f"({original_size} bytes > {BSKY_IMAGE_MAX_BYTES} bytes). Compressing..." + ) + compressed = compress_post_image_to_limit( + content, BSKY_IMAGE_MAX_BYTES + ) + if compressed: + upload_bytes = compressed + logging.info( + f"βœ… Post image compressed to {len(upload_bytes)} bytes " + f"({len(upload_bytes) / 1024:.2f} KB)" + ) + else: + logging.warning( + f"⚠️ Could not compress post image to safe limit: {media_url}" + ) + return None + + return upload_blob_with_retry(client, upload_bytes, media_label=media_url) + + except Exception as e: + logging.warning(f"Could not fetch media {media_url}: {repr(e)}") return None - url = html.unescape(url.strip()) - url = strip_trailing_url_punctuation(url) + + +def get_blob_from_file(file_path, client): try: - parsed = urlparse(url) - parsed = parsed._replace(fragment="") - return urlunparse(parsed) - except Exception: - return url + if not os.path.exists(file_path): + logging.warning( + f"Could not upload local file {file_path}: file does not exist" + ) + return None + + file_size = os.path.getsize(file_path) + file_size_mb = file_size / (1024 * 1024) + + logging.info( + f"πŸ“¦ Uploading local file {file_path} ({file_size_mb:.2f} MB)" + ) + + if ( + file_path.lower().endswith(".mp4") + and file_size_mb > MAX_VIDEO_UPLOAD_SIZE_MB + ): + logging.warning( + f"Could not upload local file {file_path}: " + f"file too large ({file_size_mb:.2f} MB > {MAX_VIDEO_UPLOAD_SIZE_MB} MB)" + ) + return None + + with open(file_path, "rb") as f: + binary_data = f.read() + + return upload_blob_with_retry(client, binary_data, media_label=file_path) + + except Exception as e: + logging.warning(f"Could not upload local file {file_path}: {repr(e)}") + + if hasattr(e, "response") and e.response is not None: + try: + logging.warning( + f"Upload response status: {e.response.status_code}" + ) + logging.warning(f"Upload response body: {e.response.text}") + except Exception: + pass + + return None -def clean_whitespace(text: str) -> str: - if not text: - return "" - text = text.replace("\r", "\n") - lines = [line.strip() for line in text.splitlines()] - text = "\n".join(lines) - text = re.sub(r"\n{3,}", "\n\n", text) - return text.strip() - - -def normalize_text(text: str) -> str: - text = clean_whitespace(text) - text = re.sub(r"\s+", " ", text).strip().lower() - return text - - -def process_title(title: str) -> str: - if is_html(title): - title_text = BeautifulSoup(title, "html.parser").get_text().strip() - else: - title_text = (title or "").strip() - title_text = desescapar_unicode(title_text) - title_text = fix_encoding(title_text) - title_text = clean_whitespace(title_text) - return title_text - - -def build_post_text_variants(title_text: str, link: str): - title_text = clean_whitespace(title_text) - link = canonicalize_url(link) or link or "" - - variants = [] - seen = set() - - def add_variant(text: str): - cleaned = clean_whitespace(text) - if cleaned and cleaned not in seen: - seen.add(cleaned) - variants.append(cleaned) - - if title_text and link: - add_variant(f"{title_text}\n\n{link}") - if title_text: - add_variant(title_text) - if link and not title_text: - add_variant(link) - - return variants - - -def is_x_or_twitter_domain(url: str) -> bool: +def compress_external_thumb_to_limit( + image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES +): try: - hostname = (urlparse(url).hostname or "").lower() - return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com", "t.co"} - except Exception: - return False + with Image.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + + width, height = img.size + max_dim = max(width, height) + + if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: + scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + img = img.resize(new_size, Image.LANCZOS) + logging.info( + f"πŸ–ΌοΈ Resized external thumb to {new_size[0]}x{new_size[1]}" + ) + + for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + out = io.BytesIO() + img.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ External thumb candidate size at JPEG quality {quality}: " + f"{len(data) / 1024:.2f} KB" + ) + if len(data) <= max_bytes: + return data + + for target_dim in [1000, 900, 800, 700, 600]: + resized = img.copy() + width, height = resized.size + max_dim = max(width, height) + + if max_dim > target_dim: + scale = target_dim / max_dim + new_size = ( + max(1, int(width * scale)), + max(1, int(height * scale)), + ) + resized = resized.resize(new_size, Image.LANCZOS) + + for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + out = io.BytesIO() + resized.save( + out, + format="JPEG", + quality=quality, + optimize=True, + progressive=True, + ) + data = out.getvalue() + logging.info( + f"πŸ–ΌοΈ External thumb resized to <= {target_dim}px at quality {quality}: " + f"{len(data) / 1024:.2f} KB" + ) + if len(data) <= max_bytes: + return data + + except Exception as e: + logging.warning(f"Could not compress external thumbnail: {repr(e)}") + + return None -def extract_urls_from_text(text: str): - if not text: - return [] - return re.findall(r"https?://[^\s]+", text) +def get_external_thumb_blob_from_url(image_url, client, http_client): + try: + r = http_client.get( + image_url, timeout=MEDIA_DOWNLOAD_TIMEOUT, follow_redirects=True + ) + if r.status_code != 200: + logging.warning( + f"Could not fetch external thumb {image_url}: HTTP {r.status_code}" + ) + return None + + content = r.content + if not content: + logging.warning( + f"Could not fetch external thumb {image_url}: empty body" + ) + return None + + original_size_kb = len(content) / 1024 + logging.info( + f"πŸ–ΌοΈ Downloaded external thumb {image_url} ({original_size_kb:.2f} KB)" + ) + + upload_bytes = content + if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES: + logging.info( + f"πŸ–ΌοΈ External thumb exceeds safe limit " + f"({original_size_kb:.2f} KB > {EXTERNAL_THUMB_MAX_BYTES / 1024:.2f} KB). Compressing..." + ) + compressed = compress_external_thumb_to_limit( + upload_bytes, EXTERNAL_THUMB_MAX_BYTES + ) + if compressed: + upload_bytes = compressed + logging.info( + f"βœ… External thumb compressed to {len(upload_bytes) / 1024:.2f} KB" + ) + else: + logging.warning( + "⚠️ Could not compress external thumb to fit limit. Will omit thumbnail." + ) + return None + else: + logging.info("βœ… External thumb already within safe size limit.") + + blob = upload_blob_with_retry( + client, + upload_bytes, + media_label=f"external-thumb:{image_url}", + ) + if blob: + return blob + + logging.warning("⚠️ External thumb upload failed. Will omit thumbnail.") + return None + + except Exception as e: + logging.warning( + f"Could not fetch/upload external thumb {image_url}: {repr(e)}" + ) + return None -def extract_non_x_urls_from_text(text: str): - urls = extract_urls_from_text(text) - result = [] - for url in urls: - cleaned = strip_trailing_url_punctuation(url) - if cleaned and not is_x_or_twitter_domain(cleaned): - result.append(cleaned) - return result +def fetch_link_metadata(url, http_client): + try: + r = http_client.get( + url, timeout=LINK_METADATA_TIMEOUT, follow_redirects=True + ) + r.raise_for_status() + soup = BeautifulSoup(r.text, "html.parser") + + title = soup.find("meta", property="og:title") or soup.find("title") + desc = ( + soup.find("meta", property="og:description") + or soup.find("meta", attrs={"name": "description"}) + ) + image = ( + soup.find("meta", property="og:image") + or soup.find("meta", attrs={"name": "twitter:image"}) + ) + + return { + "title": ( + title["content"] + if title and title.has_attr("content") + else (title.text.strip() if title and title.text else "") + ), + "description": ( + desc["content"] if desc and desc.has_attr("content") else "" + ), + "image": ( + image["content"] if image and image.has_attr("content") else None + ), + } + + except Exception as e: + logging.warning(f"Could not fetch link metadata for {url}: {repr(e)}") + return {} -def build_entry_fingerprint(normalized_title: str, canonical_link: str) -> str: - raw = f"{normalized_title}||{canonical_link or ''}" - return hashlib.sha256(raw.encode("utf-8")).hexdigest() +def build_external_link_embed( + url, client, http_client, fallback_title="Link", prefetched_metadata=None, +): + link_metadata = ( + prefetched_metadata + if prefetched_metadata is not None + else fetch_link_metadata(url, http_client) + ) + + thumb_blob = None + if link_metadata.get("image"): + thumb_blob = get_external_thumb_blob_from_url( + link_metadata["image"], client, http_client + ) + if thumb_blob: + logging.info("βœ… External link card thumbnail prepared successfully") + else: + logging.info("ℹ️ External link card will be posted without thumbnail") + + if ( + link_metadata.get("title") + or link_metadata.get("description") + or thumb_blob + ): + return models.AppBskyEmbedExternal.Main( + external=models.AppBskyEmbedExternal.External( + uri=url, + title=link_metadata.get("title") or fallback_title, + description=link_metadata.get("description") or "", + thumb=thumb_blob, + ) + ) + + return None -def make_rich(content: str): +def make_rich(content): + # NOTE: Bluesky supports native @mention facets, but resolving a Twitter + # handle to a Bluesky DID requires an external lookup. That mapping is not + # available here so @mentions are passed through as plain text. If you add + # a handle-mapping table in the future, call + # text_builder.mention(word, did) here instead of text_builder.text(word). text_builder = client_utils.TextBuilder() - content = clean_whitespace(content) + content = clean_post_text(content) lines = content.splitlines() for line_idx, line in enumerate(lines): @@ -466,22 +2127,32 @@ def make_rich(content: str): continue cleaned_word = strip_trailing_url_punctuation(word) + normalized_candidate = normalize_urlish_token(cleaned_word) - if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"): - text_builder.link(cleaned_word, cleaned_word) - trailing = word[len(cleaned_word):] - if trailing: - text_builder.text(trailing) + if normalized_candidate: + if is_x_or_twitter_domain(normalized_candidate): + text_builder.text(word) + else: + clean_url_value = clean_url(normalized_candidate) + + if clean_url_value and is_valid_url(clean_url_value): + text_builder.link(cleaned_word, clean_url_value) + trailing = word[len(cleaned_word):] + if trailing: + text_builder.text(trailing) + else: + text_builder.text(word) elif cleaned_word.startswith("#") and len(cleaned_word) > 1: - tag_name = cleaned_word[1:].rstrip(".,;:!?)'\"…") - if tag_name: - text_builder.tag(cleaned_word, tag_name) + clean_tag = cleaned_word[1:].rstrip(".,;:!?)'\"…") + if clean_tag: + text_builder.tag(cleaned_word, clean_tag) trailing = word[len(cleaned_word):] if trailing: text_builder.text(trailing) else: text_builder.text(word) + else: text_builder.text(word) @@ -494,707 +2165,1345 @@ def make_rich(content: str): return text_builder -# ============================================================ -# Error helpers -# ============================================================ -def get_rate_limit_reset_timestamp(error_obj): +def build_dynamic_alt(raw_text, link_title=None): + dynamic_alt = clean_post_text(raw_text) + dynamic_alt = dynamic_alt.replace("\n", " ").strip() + dynamic_alt = re.sub( + r"(?:(?:https?://)|(?:www\.))\S+", "", dynamic_alt + ).strip() + + if not dynamic_alt and link_title: + dynamic_alt = link_title.strip() + + if len(dynamic_alt) > DYNAMIC_ALT_MAX_LENGTH: + dynamic_alt = dynamic_alt[:DYNAMIC_ALT_MAX_LENGTH - 3] + "..." + elif not dynamic_alt: + dynamic_alt = "Attached video or image from tweet" + + return dynamic_alt + + +def build_video_embed(video_blob, alt_text): try: - headers = getattr(error_obj, "headers", None) - if headers: - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") - if reset_value: - return int(reset_value) - except Exception: - pass - - try: - response = getattr(error_obj, "response", None) - headers = getattr(response, "headers", None) - if headers: - reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") - if reset_value: - return int(reset_value) - except Exception: - pass - - text = repr(error_obj) - match = re.search(r"'ratelimit-reset': '(\d+)'", text) - if match: - return int(match.group(1)) - - return None + return models.AppBskyEmbedVideo.Main(video=video_blob, alt=alt_text) + except AttributeError: + logging.error( + "❌ Your atproto version does not support AppBskyEmbedVideo. Upgrade atproto." + ) + return None -def is_rate_limited_error(error_obj) -> bool: - error_text = str(error_obj) - repr_text = repr(error_obj) - return ( - "429" in error_text or - "429" in repr_text or - "RateLimitExceeded" in error_text or - "RateLimitExceeded" in repr_text - ) +# --- Twitter Scraping --- +def scrape_tweets_via_playwright(username, password, email, target_handle): + tweets = [] + state_file = "twitter_browser_state.json" + if os.path.exists(state_file): + try: + os.chmod(state_file, SESSION_FILE_PERMISSIONS) + except Exception as e: + logging.warning( + f"⚠️ Could not set permissions on {state_file}: {e}" + ) -def is_transient_blob_error(error_obj) -> bool: - error_text = repr(error_obj) - transient_signals = [ - "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", - "RemoteProtocolError", "ConnectError", "503", "502", "504" - ] - return any(signal in error_text for signal in transient_signals) + with sync_playwright() as p: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + clean_ua = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ) + browser_context = None + needs_login = True + session_check_page = None -def is_timeout_error(error_obj) -> bool: - text = repr(error_obj) - return any(signal in text for signal in ["InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException"]) + if os.path.exists(state_file): + logging.info( + "βœ… Found existing browser state. Attempting to bypass login..." + ) + browser_context = browser.new_context( + user_agent=clean_ua, + viewport={"width": 1920, "height": 1080}, + storage_state=state_file, + ) + session_check_page = browser_context.new_page() + session_check_page.goto("https://x.com/home") + time.sleep(3) + if ( + session_check_page.locator( + '[data-testid="SideNav_NewTweet_Button"]' + ).is_visible() + or "/home" in session_check_page.url + ): + logging.info("βœ… Session is valid!") + needs_login = False + else: + logging.warning( + "⚠️ Saved session expired or invalid. Re-logging in..." + ) + session_check_page.close() + session_check_page = None + browser_context.close() + browser_context = None + os.remove(state_file) -def is_probable_length_error(exc) -> bool: - text = repr(exc) - signals = [ - "TextTooLong", "text too long", "Invalid app.bsky.feed.post record", - "string too long", "maxLength", "length", "grapheme too big" - ] - return any(signal.lower() in text.lower() for signal in signals) + if session_check_page is not None: + session_check_page.close() + session_check_page = None + if needs_login: + logging.info( + "πŸš€ Launching fresh browser for automated Twitter login..." + ) + browser_context = browser.new_context( + user_agent=clean_ua, + viewport={"width": 1920, "height": 1080}, + ) + login_page = browser_context.new_page() -def activate_post_creation_cooldown_from_error(error_obj, cooldown_path: str, cfg: AppConfig) -> int: - reset_ts = get_rate_limit_reset_timestamp(error_obj) - if not reset_ts: - reset_ts = int(time.time()) + cfg.cooldown.default_post_cooldown_seconds - final_ts = set_global_post_cooldown_until(reset_ts, cooldown_path) - logging.error(f"πŸ›‘ === BSKY POST STOPPED: RATE LIMITED === Posting disabled until {format_cooldown_until(final_ts)}") - return final_ts - - -def activate_thumb_upload_cooldown_from_error(error_obj, cooldown_path: str, cfg: AppConfig) -> int: - reset_ts = get_rate_limit_reset_timestamp(error_obj) - if not reset_ts: - reset_ts = int(time.time()) + cfg.cooldown.default_thumb_cooldown_seconds - final_ts = set_global_thumb_cooldown_until(reset_ts, cooldown_path) - logging.warning(f"πŸ–ΌοΈ Thumbnail uploads disabled until {format_cooldown_until(final_ts)}.") - return final_ts - - -def get_rate_limit_wait_seconds(error_obj, default_delay: int, cfg: AppConfig) -> int: - reset_ts = get_rate_limit_reset_timestamp(error_obj) - if reset_ts: - now_ts = int(time.time()) - wait_seconds = max(reset_ts - now_ts + 1, default_delay) - return min(wait_seconds, cfg.retry.blob_upload_max_delay) - return default_delay - - -# ============================================================ -# Bluesky helpers -# ============================================================ -def extract_urls_from_facets(record) -> List[str]: - urls = [] - try: - facets = getattr(record, "facets", None) or [] - for facet in facets: - features = getattr(facet, "features", None) or [] - for feature in features: - uri = getattr(feature, "uri", None) - if uri: - urls.append(uri) - except Exception as e: - logging.debug(f"Could not extract facet URLs: {e}") - return urls - - -def get_recent_bsky_posts(client: Client, handle: str, limit: int) -> List[RecentBskyPost]: - recent_posts: List[RecentBskyPost] = [] - try: - timeline = client.get_author_feed(handle, limit=limit) - - for item in timeline.feed: try: - if item.reason is not None: - continue + login_page.goto("https://x.com") + sign_in_button = login_page.get_by_text("Sign in", exact=True) + sign_in_button.wait_for(state="visible", timeout=15000) + sign_in_button.click(force=True) - record = item.post.record - if getattr(record, "reply", None) is not None: - continue + login_page.wait_for_selector( + 'h1:has-text("Sign in to X")', + state="visible", + timeout=25000, + ) + logging.info(f"πŸ‘€ Entering username: {username}...") + time.sleep(1) - text = getattr(record, "text", "") or "" - normalized = normalize_text(text) + username_input = login_page.locator( + 'input[autocomplete="username"]' + ).first + username_input.wait_for(state="visible", timeout=15000) + username_input.click(force=True) + username_input.press_sequentially(username, delay=100) - urls = [] - urls.extend(extract_non_x_urls_from_text(text)) - urls.extend(extract_urls_from_facets(record)) + login_page.locator('button:has-text("Next")').first.click( + force=True + ) + login_page.wait_for_selector( + 'input[name="password"], ' + 'input[data-testid="ocfEnterTextTextInput"], ' + 'input[name="text"]', + timeout=15000, + ) + time.sleep(1) - canonical_non_x_urls: Set[str] = set() - for url in urls: - if not is_x_or_twitter_domain(url): - c = canonicalize_url(url) - if c: - canonical_non_x_urls.add(c) + if login_page.locator( + 'input[data-testid="ocfEnterTextTextInput"]' + ).is_visible() or login_page.locator( + 'input[name="text"]' + ).is_visible(): + logging.warning( + "πŸ›‘οΈ Security challenge detected! Entering email/phone..." + ) + login_page.fill( + 'input[data-testid="ocfEnterTextTextInput"], ' + 'input[name="text"]', + email, + ) + sec_next = login_page.locator( + '[data-testid="ocfEnterTextNextButton"], ' + 'span:has-text("Next")' + ).first + if sec_next.is_visible(): + sec_next.click(force=True) + else: + login_page.keyboard.press("Enter") + login_page.wait_for_selector( + 'input[name="password"]', timeout=15000 + ) + time.sleep(1) - recent_posts.append(RecentBskyPost( - uri=getattr(item.post, "uri", None), - text=text, - normalized_text=normalized, - canonical_non_x_urls=canonical_non_x_urls, - created_at=getattr(record, "created_at", None), - )) + logging.info("πŸ”‘ Entering password...") + login_page.fill('input[name="password"]', password) + login_page.locator('span:has-text("Log in")').first.click() + + login_page.wait_for_url("**/home", timeout=20000) + time.sleep(3) + + browser_context.storage_state(path=state_file) + try: + os.chmod(state_file, SESSION_FILE_PERMISSIONS) + except Exception as chmod_err: + logging.warning( + f"⚠️ Could not set permissions on {state_file} " + f"after save: {chmod_err}" + ) + logging.info("βœ… Login successful. Browser state saved.") except Exception as e: - logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") + take_error_screenshot(login_page, "login_failed") + logging.error(f"❌ Login failed: {e}") + login_page.close() + browser.close() + return [] + + login_page.close() + + logging.info( + f"🌐 Navigating to https://x.com/{target_handle} to scrape tweets..." + ) + scrape_page = browser_context.new_page() + scrape_page.goto(f"https://x.com/{target_handle}") + + try: + scrape_page.wait_for_selector("article", timeout=20000) + time.sleep(2) + + articles = scrape_page.locator("article").all() + logging.info( + f"πŸ“Š Found {len(articles)} tweets on screen. " + f"Parsing up to {SCRAPE_TWEET_LIMIT}..." + ) + + for article in articles[:SCRAPE_TWEET_LIMIT]: + try: + time_el = article.locator("time").first + if not time_el.is_visible(): + continue + + created_at = time_el.get_attribute("datetime") + + tweet_url = None + time_link = article.locator("a:has(time)").first + if time_link.is_visible(): + href = time_link.get_attribute("href") + if href: + tweet_url = ( + f"https://x.com{href}" + if href.startswith("/") + else href + ) + + is_retweet = False + try: + social_context_el = article.locator( + '[data-testid="socialContext"]' + ).first + if social_context_el.is_visible(): + context_text = social_context_el.inner_text().lower() + repost_keywords = [ + "reposted", + "retweeted", + "ha repostejat", + "ha retuitat", + "repostejat", + "ha reposteado", + "retuiteΓ³", + ] + if any(kw in context_text for kw in repost_keywords): + is_retweet = True + logging.info( + f"πŸ” Detected retweet/repost: {tweet_url}" + ) + except Exception: + pass + + text_locator = article.locator( + '[data-testid="tweetText"]' + ).first + text = ( + text_locator.inner_text() + if text_locator.is_visible() + else "" + ) + + media_urls = [] + + photo_locators = article.locator( + '[data-testid="tweetPhoto"] img' + ).all() + for img in photo_locators: + src = img.get_attribute("src") + if src: + src = re.sub(r"&name=\w+", "&name=large", src) + media_urls.append((src, "photo")) + + video_locators = article.locator( + '[data-testid="videoPlayer"]' + ).all() + if video_locators: + media_urls.append((tweet_url or "", "video")) + + card_url = None + try: + card_locator = article.locator( + '[data-testid="card.wrapper"] a[href]' + ).first + if card_locator.is_visible(): + card_href = card_locator.get_attribute("href") + if card_href: + card_url = card_href.strip() + logging.info( + f"πŸƒ Scraped card URL from tweet: {card_url}" + ) + except Exception: + pass + + if not card_url: + try: + card_role_link = article.locator( + '[data-testid="card.wrapper"] [role="link"]' + ).first + if card_role_link.is_visible(): + card_a = card_role_link.locator("a[href]").first + if card_a.is_visible(): + card_href = card_a.get_attribute("href") + if card_href: + card_url = card_href.strip() + logging.info( + f"πŸƒ Scraped card URL (fallback) from tweet: {card_url}" + ) + except Exception: + pass + + tweets.append( + ScrapedTweet( + created_at, + text, + media_urls, + tweet_url=tweet_url, + card_url=card_url, + is_retweet=is_retweet, + ) + ) + + except Exception as e: + logging.warning(f"⚠️ Failed to parse a specific tweet: {e}") + continue + + except Exception as e: + take_error_screenshot(scrape_page, "scrape_failed") + logging.error(f"❌ Failed to scrape profile: {e}") + + browser.close() + return tweets + + +# --- Video Extraction & Processing --- +def extract_video_url_from_tweet_page(browser_context, tweet_url): + page = browser_context.new_page() + best_m3u8_url = None + best_video_mp4_url = None + seen_urls = set() + + def is_audio_only_mp4(url, content_type): + url_l = url.lower() + content_type_l = content_type.lower() + return ( + "/aud/" in url_l + or "/audio/" in url_l + or "mp4a" in url_l + or ("audio/" in content_type_l and "video/" not in content_type_l) + ) + + def handle_response(response): + nonlocal best_m3u8_url, best_video_mp4_url + try: + url = response.url + if url in seen_urls: + return + seen_urls.add(url) + + url_l = url.lower() + content_type = response.headers.get("content-type", "") + content_type_l = content_type.lower() + + if ".m4s" in url_l: + return + + if ( + ".m3u8" in url_l + or "application/vnd.apple.mpegurl" in content_type_l + or "application/x-mpegurl" in content_type_l + ): + if best_m3u8_url is None: + best_m3u8_url = url + logging.info(f"πŸ“Ί Found HLS playlist URL: {url}") + return + + if ( + ".mp4" in url_l + or "video/mp4" in content_type_l + or "audio/mp4" in content_type_l + ): + if is_audio_only_mp4(url, content_type): + logging.info(f"πŸ”‡ Ignoring audio-only MP4: {url}") + return + + if best_video_mp4_url is None: + best_video_mp4_url = url + logging.info(f"πŸŽ₯ Found VIDEO MP4 URL: {url}") + return + + except Exception as e: + logging.debug(f"Response parsing error: {e}") + + page.on("response", handle_response) + + def current_best(): + return best_m3u8_url or best_video_mp4_url + + try: + logging.info( + f"🎬 Opening tweet page to capture video URL: {tweet_url}" + ) + page.goto(tweet_url, wait_until="domcontentloaded", timeout=30000) + time.sleep(2) + + player = page.locator('[data-testid="videoPlayer"]').first + + if player.count() > 0: + try: + player.scroll_into_view_if_needed(timeout=5000) + except Exception: + pass + + try: + player.click(force=True, timeout=5000) + logging.info("▢️ Clicked video player") + except Exception as e: + logging.info(f"⚠️ First player click failed: {e}") + else: + logging.warning("⚠️ No video player locator found on tweet page") + + for _ in range(VIDEO_PLAYER_WAIT_ROUNDS): + if current_best(): + break + time.sleep(1) + + if not current_best() and player.count() > 0: + logging.info( + "πŸ” No media URL found yet, retrying player interaction..." + ) + try: + player.click(force=True, timeout=5000) + time.sleep(PLAYWRIGHT_RETRY_SLEEP_S) + except Exception as e: + logging.info(f"⚠️ Retry click failed: {e}") + + try: + page.keyboard.press("Space") + time.sleep(1) + except Exception: + pass + + for _ in range(VIDEO_PLAYER_RETRY_ROUNDS): + if current_best(): + break + time.sleep(1) + + selected_url = current_best() + if selected_url: + logging.info(f"βœ… Selected media URL for download: {selected_url}") + else: + logging.warning( + f"⚠️ No playable media URL detected on tweet page: {tweet_url}" + ) + + return selected_url + except Exception as e: - logging.warning(f"⚠️ Could not fetch recent Bluesky posts for duplicate detection: {e}") - - return recent_posts + logging.warning( + f"⚠️ Could not extract video URL from tweet page {tweet_url}: {e}" + ) + return None + finally: + page.close() -def candidate_matches_existing_bsky(candidate: EntryCandidate, recent_bsky_posts: List[RecentBskyPost]) -> Tuple[bool, Optional[str]]: - candidate_link = candidate.canonical_link - candidate_title_normalized = candidate.normalized_title +def _probe_video_duration(file_path): + probe_cmd = [ + "ffprobe", + "-v", "error", + "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", + file_path, + ] + try: + result = subprocess.run( + probe_cmd, + capture_output=True, + text=True, + timeout=FFPROBE_TIMEOUT_SECONDS, + ) + if result.returncode != 0: + raise RuntimeError( + f"ffprobe exited with code {result.returncode}: " + f"{result.stderr.strip()}" + ) + duration_str = result.stdout.strip() + if not duration_str: + raise RuntimeError("ffprobe returned empty duration output") + return float(duration_str) + except subprocess.TimeoutExpired: + raise RuntimeError( + f"ffprobe timed out after {FFPROBE_TIMEOUT_SECONDS}s on {file_path}" + ) + + +def download_and_crop_video(video_url, output_path): + temp_input = output_path.replace(".mp4", "_source.mp4") + temp_trimmed = output_path.replace(".mp4", "_trimmed.mp4") + temp_output = output_path.replace(".mp4", "_compressed.mp4") + + try: + logging.info(f"⬇️ Downloading video source with ffmpeg: {video_url}") + + video_url_l = video_url.lower() + + if ".m3u8" in video_url_l: + logging.info("πŸ“Ί Using HLS ffmpeg mode") + download_cmd = [ + "ffmpeg", "-y", + "-protocol_whitelist", "file,http,https,tcp,tls,crypto", + "-allowed_extensions", "ALL", + "-i", video_url, + "-c", "copy", + temp_input, + ] + else: + logging.info("πŸŽ₯ Using direct MP4 ffmpeg mode") + download_cmd = [ + "ffmpeg", "-y", + "-i", video_url, + "-c", "copy", + temp_input, + ] + + download_result = subprocess.run( + download_cmd, + capture_output=True, + text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, + ) + + if download_result.returncode != 0: + logging.error( + f"❌ ffmpeg download failed:\n{download_result.stderr}" + ) + return None + + if ( + not os.path.exists(temp_input) + or os.path.getsize(temp_input) == 0 + ): + logging.error("❌ Downloaded video source file is missing or empty.") + return None + + logging.info(f"βœ… Video downloaded: {temp_input}") + + try: + duration = _probe_video_duration(temp_input) + except RuntimeError as probe_err: + logging.error(f"❌ Could not probe video duration: {probe_err}") + return None + + if duration <= 0: + logging.error("❌ Downloaded video has invalid or unknown duration.") + return None + + end_time = min(VIDEO_MAX_DURATION_SECONDS, duration) + + video_clip = VideoFileClip(temp_input) + try: + if hasattr(video_clip, "subclipped"): + cropped_clip = video_clip.subclipped(0, end_time) + else: + cropped_clip = video_clip.subclip(0, end_time) + + try: + cropped_clip.write_videofile( + temp_trimmed, + codec="libx264", + audio_codec="aac", + preset="veryfast", + bitrate="1800k", + audio_bitrate="128k", + logger=None, + ) + finally: + cropped_clip.close() + finally: + video_clip.close() + + if ( + not os.path.exists(temp_trimmed) + or os.path.getsize(temp_trimmed) == 0 + ): + logging.error("❌ Trimmed video output is missing or empty.") + return None + + trimmed_size_mb = os.path.getsize(temp_trimmed) / (1024 * 1024) + logging.info( + f"πŸ“¦ Trimmed video size before compression: {trimmed_size_mb:.2f} MB" + ) + + compress_cmd = [ + "ffmpeg", "-y", + "-i", temp_trimmed, + "-vf", "scale='min(720,iw)':-2", + "-c:v", "libx264", + "-preset", "veryfast", + "-crf", "30", + "-maxrate", "1800k", + "-bufsize", "3600k", + "-c:a", "aac", + "-b:a", "128k", + "-movflags", "+faststart", + temp_output, + ] + + compress_result = subprocess.run( + compress_cmd, + capture_output=True, + text=True, + timeout=SUBPROCESS_TIMEOUT_SECONDS, + ) + + if compress_result.returncode != 0: + logging.error( + f"❌ ffmpeg compression failed:\n{compress_result.stderr}" + ) + return None + + if ( + not os.path.exists(temp_output) + or os.path.getsize(temp_output) == 0 + ): + logging.error("❌ Compressed video output is missing or empty.") + return None + + final_size_mb = os.path.getsize(temp_output) / (1024 * 1024) + logging.info( + f"βœ… Video compressed successfully: {temp_output} ({final_size_mb:.2f} MB)" + ) + + os.replace(temp_output, output_path) + logging.info(f"βœ… Final video ready: {output_path}") + return output_path + + except subprocess.TimeoutExpired: + logging.error( + f"❌ ffmpeg subprocess timed out after {SUBPROCESS_TIMEOUT_SECONDS}s" + ) + return None + + except Exception as e: + logging.error(f"❌ Error processing video: {repr(e)}") + return None + + finally: + remove_file_quietly(temp_input) + remove_file_quietly(temp_trimmed) + remove_file_quietly(temp_output) + + +# --- Deduplication --- +def candidate_matches_existing_bsky(candidate, recent_bsky_posts): + candidate_non_x_urls = candidate["canonical_non_x_urls"] + candidate_text_media_key = candidate["text_media_key"] + candidate_normalized_text = candidate["normalized_text"] for existing in recent_bsky_posts: - if candidate_link and candidate_link in existing.canonical_non_x_urls: - return True, "bsky:canonical_link" + existing_non_x_urls = existing["canonical_non_x_urls"] - if candidate_title_normalized and candidate_title_normalized in existing.normalized_text: - if not candidate_link or candidate_link in existing.canonical_non_x_urls: - return True, "bsky:title_plus_link" + if ( + candidate_non_x_urls + and candidate_non_x_urls == existing_non_x_urls + and candidate_normalized_text == existing["normalized_text"] + ): + return True, "bsky:normalized_text_plus_non_x_urls" + + if candidate_text_media_key == existing["text_media_key"]: + return True, "bsky:text_media_fingerprint" + + if candidate_normalized_text == existing["normalized_text"]: + return True, "bsky:normalized_text" return False, None -def upload_blob_with_retry( - client: Client, - binary_data: bytes, - cfg: AppConfig, - media_label: str = "media", - optional: bool = False, - cooldown_on_rate_limit: bool = False, - cooldown_path: Optional[str] = None, -): - last_exception = None - transient_attempts = 0 +# --- Main Sync Logic --- +def sync_feeds(args): + logging.info("πŸ”„ Starting sync cycle...") - for attempt in range(1, cfg.retry.blob_upload_max_retries + 1): - try: - result = client.upload_blob(binary_data) - return result.blob + dry_run = getattr(args, "dry_run", False) + bsky_langs = getattr(args, "bsky_langs", None) or DEFAULT_BSKY_LANGS - except Exception as e: - last_exception = e + if dry_run: + logging.info("πŸ§ͺ DRY RUN MODE β€” no posts will be created on Bluesky.") - if is_rate_limited_error(e): - if cooldown_on_rate_limit and cooldown_path: - activate_thumb_upload_cooldown_from_error(e, cooldown_path, cfg) + try: + state = load_state(STATE_PATH) + state = prune_state(state, max_entries=5000) - if optional and cooldown_on_rate_limit: - logging.warning( - f"🟑 Optional blob upload rate-limited for {media_label}. " - f"Skipping remaining retries and omitting optional media." - ) - return None + tweets = scrape_tweets_via_playwright( + args.twitter_username, + args.twitter_password, + args.twitter_email, + args.twitter_handle, + ) - backoff_delay = min( - cfg.retry.blob_upload_base_delay * (2 ** (attempt - 1)), - cfg.retry.blob_upload_max_delay - ) - wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay, cfg) + if not tweets: + logging.warning( + "⚠️ No tweets found or failed to fetch. " + "Skipping Bluesky sync for this cycle." + ) + return - if attempt < cfg.retry.blob_upload_max_retries: - logging.warning( - f"⏳ Blob upload rate-limited for {media_label}. " - f"Retry {attempt}/{cfg.retry.blob_upload_max_retries} after {wait_seconds}s." - ) - time.sleep(wait_seconds) + bsky_client = None + if not dry_run: + bsky_client = create_bsky_client( + args.bsky_base_url, + args.bsky_handle, + args.bsky_password, + ) + + recent_bsky_posts = [] + if not dry_run: + recent_bsky_posts = get_recent_bsky_posts( + bsky_client, + args.bsky_handle, + limit=DEDUPE_BSKY_LIMIT, + ) + + logging.info( + f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts " + f"for duplicate detection." + ) + logging.info( + f"🧠 Local state currently tracks " + f"{len(state.get('posted_tweets', {}))} posted items." + ) + + too_old_cutoff = arrow.utcnow().shift(days=-TWEET_MAX_AGE_DAYS) + logging.info(f"πŸ•’ Will ignore tweets older than: {too_old_cutoff}") + + candidate_tweets = [] + cheap_candidates = [] + + for tweet in reversed(tweets): + try: + tweet_time = arrow.get(tweet.created_on) + + if tweet_time < too_old_cutoff: + logging.info(f"⏭️ Skipping old tweet from {tweet_time}") continue - logging.warning(f"⚠️ Exhausted blob upload retries for {media_label}: {repr(e)}") - break - - if is_transient_blob_error(e) and transient_attempts < cfg.retry.blob_transient_error_retries: - transient_attempts += 1 - wait_seconds = cfg.retry.blob_transient_error_delay * transient_attempts - logging.warning( - f"⏳ Transient blob upload failure for {media_label}: {repr(e)}. " - f"Retry {transient_attempts}/{cfg.retry.blob_transient_error_retries} after {wait_seconds}s." - ) - time.sleep(wait_seconds) - continue - - logging.warning(f"⚠️ Could not upload {media_label}: {repr(e)}") - return None - - logging.warning(f"⚠️ Could not upload {media_label}: {repr(last_exception)}") - return None - - -def try_send_post_with_variants(client: Client, text_variants: List[str], embed, post_lang: str, cooldown_path: str, cfg: AppConfig): - if is_global_post_cooldown_active(cooldown_path): - reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) - raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") - - last_exception = None - - for idx, variant in enumerate(text_variants, start=1): - try: - if is_global_post_cooldown_active(cooldown_path): - reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) - raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") - - logging.info(f"πŸ“ Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})") - rich_text = make_rich(variant) - result = client.send_post(text=rich_text, embed=embed, langs=[post_lang]) - return result, variant - - except Exception as e: - last_exception = e - logging.warning(f"⚠️ Post variant {idx} failed: {repr(e)}") - - if is_rate_limited_error(e): - activate_post_creation_cooldown_from_error(e, cooldown_path, cfg) - raise - - if is_timeout_error(e): - raise - - if not is_probable_length_error(e): - raise - - if last_exception: - raise last_exception - raise RuntimeError("No text variants available to post.") - - -# ============================================================ -# Embeds / metadata / image compression -# ============================================================ -def compress_external_thumb_to_limit(image_bytes: bytes, cfg: AppConfig): - if not PIL_AVAILABLE: - return None - - try: - with Image.open(io.BytesIO(image_bytes)) as img: - img = img.convert("RGB") - - width, height = img.size - max_dim = max(width, height) - - if max_dim > cfg.limits.external_thumb_max_dimension: - scale = cfg.limits.external_thumb_max_dimension / max_dim - new_size = (max(1, int(width * scale)), max(1, int(height * scale))) - img = img.resize(new_size, Image.LANCZOS) - logging.info(f"πŸ–ΌοΈ Resized external thumb to {new_size[0]}x{new_size[1]}") - - best_so_far = None # explicit fix - - for quality in [78, 70, 62, 54, 46, 40, cfg.limits.external_thumb_min_jpeg_quality]: - out = io.BytesIO() - img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) - data = out.getvalue() - - logging.info(f"πŸ–ΌοΈ External thumb candidate size at JPEG quality {quality}: {len(data) / 1024:.2f} KB") - - if len(data) <= cfg.limits.external_thumb_target_bytes: - return data - - if len(data) <= cfg.limits.external_thumb_max_bytes: - best_so_far = data - - if best_so_far and len(best_so_far) <= cfg.limits.external_thumb_max_bytes: - return best_so_far - - for target_dim in [900, 800, 700, 600, 500]: - resized = img.copy() - width, height = resized.size - max_dim = max(width, height) - - if max_dim > target_dim: - scale = target_dim / max_dim - new_size = (max(1, int(width * scale)), max(1, int(height * scale))) - resized = resized.resize(new_size, Image.LANCZOS) - - for quality in [54, 46, 40, cfg.limits.external_thumb_min_jpeg_quality]: - out = io.BytesIO() - resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) - data = out.getvalue() - + if tweet.is_retweet: logging.info( - f"πŸ–ΌοΈ External thumb resized to <= {target_dim}px at quality {quality}: " - f"{len(data) / 1024:.2f} KB" + f"⏭️ Skipping retweet/repost: {tweet.tweet_url}" ) + continue - if len(data) <= cfg.limits.external_thumb_target_bytes: - return data + canonical_tweet_url = canonicalize_tweet_url(tweet.tweet_url) + if canonical_tweet_url and canonical_tweet_url in state.get( + "posted_tweets", {} + ): + logging.info( + f"⚑ Early skip due to known tweet URL in local state: " + f"{canonical_tweet_url}" + ) + continue - if len(data) <= cfg.limits.external_thumb_max_bytes: - best_so_far = data + scraped_text = clean_post_text(tweet.text or "") + if not scraped_text and not tweet.media: + logging.info( + f"⏭️ Skipping empty/blank tweet from {tweet_time}" + ) + continue - if best_so_far and len(best_so_far) <= cfg.limits.external_thumb_max_bytes: - return best_so_far - - except Exception as e: - logging.warning(f"⚠️ Could not compress external thumbnail: {repr(e)}") - - return None - - -def fetch_link_metadata(url: str, http_client: httpx.Client, cfg: AppConfig): - try: - r = http_client.get(url, timeout=cfg.network.http_timeout, follow_redirects=True) - r.raise_for_status() - soup = BeautifulSoup(r.text, "html.parser") - - title = (soup.find("meta", property="og:title") or soup.find("title")) - desc = (soup.find("meta", property="og:description") or soup.find("meta", attrs={"name": "description"})) - image = (soup.find("meta", property="og:image") or soup.find("meta", attrs={"name": "twitter:image"})) - - return { - "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), - "description": desc["content"] if desc and desc.has_attr("content") else "", - "image": image["content"] if image and image.has_attr("content") else None, - } - except Exception as e: - logging.warning(f"⚠️ Could not fetch link metadata for {url}: {e}") - return {} - - -def get_external_thumb_blob_from_url(image_url: str, client: Client, http_client: httpx.Client, cooldown_path: str, cfg: AppConfig): - if check_thumb_cooldown_or_log(cooldown_path): - return None - - try: - r = http_client.get(image_url, timeout=cfg.network.http_timeout, follow_redirects=True) - if r.status_code != 200: - logging.warning(f"⚠️ Could not fetch external thumb {image_url}: HTTP {r.status_code}") - return None - - content = r.content - if not content: - logging.warning(f"⚠️ Could not fetch external thumb {image_url}: empty body") - return None - - logging.info(f"πŸ–ΌοΈ Downloaded external thumb {image_url} ({len(content) / 1024:.2f} KB)") - - upload_bytes = compress_external_thumb_to_limit(content, cfg) - if not upload_bytes: - logging.warning("⚠️ Could not prepare compressed external thumbnail. Omitting thumbnail.") - return None - - logging.info(f"πŸ–ΌοΈ Final external thumb upload size: {len(upload_bytes) / 1024:.2f} KB") - - blob = upload_blob_with_retry( - client=client, - binary_data=upload_bytes, - cfg=cfg, - media_label=f"external-thumb:{image_url}", - optional=True, - cooldown_on_rate_limit=True, - cooldown_path=cooldown_path - ) - if blob: - logging.info("βœ… External thumbnail uploaded successfully") - return blob - - logging.warning("⚠️ External thumbnail upload failed. Will omit thumbnail.") - return None - - except Exception as e: - logging.warning(f"⚠️ Could not fetch/upload external thumb {image_url}: {repr(e)}") - return None - - -def build_external_link_embed(url: str, fallback_title: str, client: Client, http_client: httpx.Client, cooldown_path: str, cfg: AppConfig): - link_metadata = fetch_link_metadata(url, http_client, cfg) - - thumb_blob = None - if link_metadata.get("image"): - thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client, cooldown_path, cfg) - if thumb_blob: - logging.info("βœ… External link card thumbnail prepared successfully") - else: - logging.info("ℹ️ External link card will be posted without thumbnail") - else: - logging.info("ℹ️ No og:image found for external link card") - - if link_metadata.get("title") or link_metadata.get("description") or thumb_blob: - return models.AppBskyEmbedExternal.Main( - external=models.AppBskyEmbedExternal.External( - uri=url, - title=link_metadata.get("title") or fallback_title or "EnllaΓ§", - description=link_metadata.get("description") or "", - thumb=thumb_blob, - ) - ) - return None - - -# ============================================================ -# Feed helpers -# ============================================================ -def parse_entry_time(item): - candidates = [getattr(item, "published", None), getattr(item, "updated", None), getattr(item, "pubDate", None)] - for candidate in candidates: - if candidate: - try: - return arrow.get(candidate) - except Exception: - continue - return None - - -def fetch_feed_content(feed_url: str, http_client: httpx.Client, cfg: AppConfig) -> str: - response = http_client.get(feed_url, timeout=cfg.network.http_timeout, follow_redirects=True) - response.raise_for_status() - - try: - result = charset_normalizer.from_bytes(response.content).best() - if not result or not hasattr(result, "text"): - raise ValueError("Could not detect feed encoding.") - return result.text - except ValueError: - logging.warning("⚠️ Could not detect feed encoding with charset_normalizer. Trying latin-1.") - try: - return response.content.decode("latin-1") - except UnicodeDecodeError: - logging.warning("⚠️ Could not decode with latin-1. Trying utf-8 with ignored errors.") - return response.content.decode("utf-8", errors="ignore") - - -def build_candidates_from_feed(feed) -> List[EntryCandidate]: - candidates: List[EntryCandidate] = [] - - for item in getattr(feed, "entries", []): - try: - title_text = process_title(getattr(item, "title", "") or "") - link = canonicalize_url(getattr(item, "link", "") or "") - published_at = parse_entry_time(item) - - if not title_text and not link: - logging.info("⏭️ Skipping feed item with no usable title and no link.") - continue - - normalized_title = normalize_text(title_text) - entry_fingerprint = build_entry_fingerprint(normalized_title, link) - - candidates.append(EntryCandidate( - item=item, - title_text=title_text, - normalized_title=normalized_title, - canonical_link=link, - published_at=published_at.isoformat() if published_at else None, - published_arrow=published_at, - entry_fingerprint=entry_fingerprint, - post_text_variants=build_post_text_variants(title_text, link), - )) - - except Exception as e: - logging.warning(f"⚠️ Failed to prepare feed entry candidate: {e}") - - candidates.sort(key=lambda c: c.published_arrow or arrow.get(0)) - return candidates - - -# ============================================================ -# Orchestration -# ============================================================ -def login_with_backoff(client: Client, bsky_username: str, bsky_password: str, service_url: str): - backoff = 60 - while True: - try: - if check_post_cooldown_or_log(args.cooldown_path): - return False - logging.info(f"πŸ” Attempting login to server: {service_url} with user: {bsky_username}") - client.login(bsky_username, bsky_password) - logging.info(f"βœ… Login successful for user: {bsky_username}") - return True - except Exception: - logging.exception("❌ Login exception") - time.sleep(backoff) - backoff = min(backoff + 60, 600) - - -def run_once( - rss_feed: str, - bsky_handle: str, - bsky_username: str, - bsky_password: str, - service_url: str, - post_lang: str, - state_path: str, - cooldown_path: str, - cfg: AppConfig -) -> RunResult: - if not PIL_AVAILABLE: - logging.warning("🟑 Pillow is not installed. External card thumbnail compression is disabled.") - - if check_post_cooldown_or_log(cooldown_path): - return RunResult(published_count=0, stopped_reason="global_post_cooldown_active") - - client = Client(base_url=service_url) - backoff = 60 - while True: - try: - if check_post_cooldown_or_log(cooldown_path): - return RunResult(published_count=0, stopped_reason="global_post_cooldown_active") - logging.info(f"πŸ” Attempting login to server: {service_url} with user: {bsky_username}") - client.login(bsky_username, bsky_password) - logging.info(f"βœ… Login successful for user: {bsky_username}") - break - except Exception: - logging.exception("❌ Login exception") - time.sleep(backoff) - backoff = min(backoff + 60, 600) - - state = load_state(state_path) - recent_bsky_posts = get_recent_bsky_posts(client, bsky_handle, limit=cfg.limits.dedupe_bsky_limit) - - logging.info(f"🧠 Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") - logging.info(f"🧠 Local state currently tracks {len(state.get('posted_entries', {}))} posted items.") - - with httpx.Client() as http_client: - feed_content = fetch_feed_content(rss_feed, http_client, cfg) - feed = fastfeedparser.parse(feed_content) - candidates = build_candidates_from_feed(feed) - - logging.info(f"πŸ“° Prepared {len(candidates)} feed entry candidates for duplicate comparison.") - - entries_to_post: List[EntryCandidate] = [] - for candidate in candidates: - is_dup_state, reason_state = candidate_matches_state(candidate, state) - if is_dup_state: - logging.info(f"⏭️ Skipping candidate due to local state duplicate match on: {reason_state}") - continue - - is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) - if is_dup_bsky: - logging.info(f"⏭️ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") - continue - - entries_to_post.append(candidate) - - logging.info(f"πŸ“¬ {len(entries_to_post)} entries remain after duplicate filtering.") - - if not entries_to_post: - logging.info("ℹ️ Execution finished: no new entries to publish.") - return RunResult(published_count=0) - - if check_post_cooldown_or_log(cooldown_path): - return RunResult(published_count=0, stopped_reason="global_post_cooldown_active") - - published = 0 - - for candidate in entries_to_post: - if is_global_post_cooldown_active(cooldown_path): - reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) - logging.error(f"πŸ›‘ === BSKY POST STOPPED: GLOBAL COOLDOWN === Skipping remaining entries until {reset_str}") - break - - title_text = candidate.title_text - canonical_link = candidate.canonical_link - text_variants = candidate.post_text_variants - - logging.info(f"πŸ“° Preparing to post RSS entry: {canonical_link or title_text}") - logging.info(f"πŸš€ === BSKY POST START === {canonical_link or title_text}") - - embed = None - if canonical_link: - embed = build_external_link_embed( - canonical_link, - fallback_title=title_text or "EnllaΓ§", - client=client, - http_client=http_client, - cooldown_path=cooldown_path, - cfg=cfg + cheap_candidates.append( + (tweet, tweet_time, canonical_tweet_url) ) - try: - post_result, posted_text = try_send_post_with_variants( - client=client, - text_variants=text_variants, - embed=embed, - post_lang=post_lang, - cooldown_path=cooldown_path, - cfg=cfg - ) - - bsky_uri = getattr(post_result, "uri", None) - - remember_posted_entry(state, candidate, posted_text=posted_text, bsky_uri=bsky_uri) - state = prune_state(state, max_entries=cfg.limits.state_max_entries) - save_state(state, state_path) - - recent_bsky_posts.insert(0, RecentBskyPost( - uri=bsky_uri, - text=posted_text, - normalized_text=normalize_text(posted_text), - canonical_non_x_urls={canonical_link} if canonical_link else set(), - created_at=arrow.utcnow().isoformat(), - )) - recent_bsky_posts = recent_bsky_posts[:cfg.limits.dedupe_bsky_limit] - - published += 1 - logging.info(f"βœ… === BSKY POST SUCCESS === {canonical_link or title_text}") - logging.info(f"πŸŽ‰ Posted RSS entry to Bluesky: {canonical_link or title_text}") - time.sleep(cfg.retry.post_retry_delay_seconds) - except Exception as e: - if is_rate_limited_error(e): - reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) - logging.error(f"❌ === BSKY POST FAILED === {canonical_link or title_text}") - logging.error(f"πŸ›‘ === BSKY POST STOPPED: RATE LIMITED === Ending publish loop until {reset_str}") - break + logging.warning(f"⚠️ Failed during cheap prefilter: {e}") - if "global post cooldown is active" in str(e).lower(): - reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) - logging.warning(f"🟑 === BSKY POST SKIPPED: GLOBAL COOLDOWN === {canonical_link or title_text}") - logging.warning(f"πŸ›‘ === BSKY POST STOPPED: GLOBAL COOLDOWN === Ending publish loop until {reset_str}") - break + logging.info( + f"⚑ {len(cheap_candidates)} tweets remain after cheap prefilter." + ) - if is_timeout_error(e): - logging.error(f"⏰ === BSKY POST FAILED === {canonical_link or title_text} :: timeout") - break + with httpx.Client() as resolve_http_client: + for tweet, tweet_time, canonical_tweet_url in cheap_candidates: + try: + ( + full_clean_text, + resolved_primary_external_url, + ) = build_effective_tweet_text(tweet, resolve_http_client) - logging.exception(f"❌ === BSKY POST FAILED === {canonical_link or title_text}") + normalized_text = normalize_post_text(full_clean_text) - if published > 0: - logging.info(f"πŸŽ‰ Execution finished: published {published} new entries to Bluesky.") - else: - logging.info("ℹ️ Execution finished: no new entries were published.") + if not normalized_text and not tweet.media: + logging.info( + f"⏭️ Skipping empty/blank tweet after enrichment " + f"from {tweet_time}" + ) + continue - return RunResult(published_count=published) + ordered_non_x_urls = extract_ordered_non_x_urls( + full_clean_text + ) + canonical_non_x_urls = set() + if resolved_primary_external_url: + canonical_non_x_urls.add( + canonicalize_url(resolved_primary_external_url) + ) + + for raw_url in ordered_non_x_urls: + if not is_tco_domain( + raw_url + ) and not is_x_or_twitter_domain(raw_url): + canonical_non_x_urls.add(canonicalize_url(raw_url)) + + primary_non_x_url = None + if resolved_primary_external_url: + primary_non_x_url = resolved_primary_external_url + else: + primary_non_x_url = extract_first_visible_non_x_url( + full_clean_text + ) + if not primary_non_x_url and ordered_non_x_urls: + primary_non_x_url = ordered_non_x_urls[0] + + has_video = any( + getattr(m, "type", None) == "video" + for m in (tweet.media or []) + ) + has_photo = any( + getattr(m, "type", None) == "photo" + for m in (tweet.media or []) + ) + + raw_text = choose_final_visible_text( + full_clean_text, + primary_non_x_url=primary_non_x_url, + prefer_full_text_without_url=False, + ) + + media_fingerprint = build_media_fingerprint(tweet) + text_media_key = build_text_media_key( + normalized_text, media_fingerprint + ) + + candidate = { + "tweet": tweet, + "tweet_time": tweet_time, + "raw_text": raw_text, + "full_clean_text": full_clean_text, + "normalized_text": normalized_text, + "media_fingerprint": media_fingerprint, + "text_media_key": text_media_key, + "canonical_tweet_url": canonical_tweet_url, + "canonical_non_x_urls": canonical_non_x_urls, + "ordered_non_x_urls": ordered_non_x_urls, + "primary_non_x_url": primary_non_x_url, + "resolved_primary_external_url": resolved_primary_external_url, + "looks_like_title_plus_url": looks_like_title_plus_url_post( + full_clean_text + ), + "has_video": has_video, + "has_photo": has_photo, + } + + is_dup_state, reason_state = candidate_matches_state( + candidate, state + ) + if is_dup_state: + logging.info( + f"⏭️ Skipping candidate due to local state duplicate " + f"match on: {reason_state}" + ) + continue + + is_dup_bsky, reason_bsky = candidate_matches_existing_bsky( + candidate, recent_bsky_posts + ) + if is_dup_bsky: + logging.info( + f"⏭️ Skipping candidate due to recent Bluesky duplicate " + f"match on: {reason_bsky}" + ) + continue + + candidate_tweets.append(candidate) + + except Exception as e: + logging.warning( + f"⚠️ Failed to prepare candidate tweet: {e}" + ) + + logging.info( + f"πŸ“¬ {len(candidate_tweets)} tweets remain after duplicate filtering." + ) + + if not candidate_tweets: + logging.info( + "βœ… No new tweets need posting after duplicate comparison." + ) + return + + new_posts = 0 + browser_state_file = "twitter_browser_state.json" + + with sync_playwright() as p, httpx.Client() as media_http_client: + browser = p.chromium.launch( + headless=True, + args=["--disable-blink-features=AutomationControlled"], + ) + context_kwargs = { + "user_agent": ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/145.0.7632.6 Safari/537.36" + ), + "viewport": {"width": 1920, "height": 1080}, + } + if os.path.exists(browser_state_file): + context_kwargs["storage_state"] = browser_state_file + + browser_context = browser.new_context(**context_kwargs) + + for candidate in candidate_tweets: + tweet = candidate["tweet"] + tweet_time = candidate["tweet_time"] + raw_text = candidate["raw_text"] + full_clean_text = candidate["full_clean_text"] + + logging.info( + f"πŸ“ {'[DRY RUN] Would post' if dry_run else 'Posting'} " + f"missing tweet from {tweet_time} to Bluesky..." + ) + + if dry_run: + logging.info( + f" πŸ“„ Text: {raw_text[:200]}" + f"{'...' if len(raw_text) > 200 else ''}" + ) + logging.info( + f" πŸ”— Primary external URL: " + f"{candidate.get('resolved_primary_external_url', 'None')}" + ) + logging.info( + f" πŸƒ Card URL: {getattr(tweet, 'card_url', 'None')}" + ) + logging.info( + f" 🎬 Has video: {candidate.get('has_video', False)}" + ) + logging.info( + f" πŸ–ΌοΈ Has photo: {candidate.get('has_photo', False)}" + ) + logging.info( + f" πŸ” Is retweet: {getattr(tweet, 'is_retweet', False)}" + ) + + remember_posted_tweet( + state, + candidate, + bsky_uri=f"dry-run:{arrow.utcnow().isoformat()}", + ) + save_state(state, STATE_PATH) + new_posts += 1 + continue + + link_meta_for_alt: dict = {} + if candidate.get("resolved_primary_external_url"): + try: + link_meta_for_alt = fetch_link_metadata( + candidate["resolved_primary_external_url"], + media_http_client, + ) + except Exception: + pass + + rich_text = make_rich(raw_text) + dynamic_alt = build_dynamic_alt( + full_clean_text, + link_title=link_meta_for_alt.get("title"), + ) + + image_embeds = [] + video_embed = None + external_embed = None + media_upload_failures = [] + + has_video = candidate.get("has_video", False) + + if has_video: + video_media = next( + ( + m + for m in (tweet.media or []) + if getattr(m, "type", None) == "video" + ), + None, + ) + + if video_media: + if not tweet.tweet_url: + logging.warning( + "⚠️ Tweet has video marker but no tweet URL. " + "Skipping video." + ) + media_upload_failures.append("video:no_tweet_url") + else: + temp_video_base = make_unique_video_temp_base( + tweet.tweet_url + ) + temp_video_path = f"{temp_video_base}.mp4" + + try: + real_video_url = ( + extract_video_url_from_tweet_page( + browser_context, tweet.tweet_url + ) + ) + if not real_video_url: + logging.warning( + f"⚠️ Could not resolve playable video URL " + f"for {tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:resolve_failed:{tweet.tweet_url}" + ) + else: + cropped_video_path = download_and_crop_video( + real_video_url, temp_video_path + ) + if not cropped_video_path: + logging.warning( + f"⚠️ Video download/crop failed for " + f"{tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:crop_failed:{tweet.tweet_url}" + ) + else: + video_blob = get_blob_from_file( + cropped_video_path, bsky_client + ) + if not video_blob: + logging.warning( + f"⚠️ Video upload blob failed for " + f"{tweet.tweet_url}" + ) + media_upload_failures.append( + f"video:upload_failed:{tweet.tweet_url}" + ) + else: + video_embed = build_video_embed( + video_blob, dynamic_alt + ) + if not video_embed: + media_upload_failures.append( + f"video:embed_failed:{tweet.tweet_url}" + ) + finally: + remove_file_quietly(temp_video_path) + remove_file_quietly( + f"{temp_video_base}_source.mp4" + ) + remove_file_quietly( + f"{temp_video_base}_trimmed.mp4" + ) + remove_file_quietly( + f"{temp_video_base}_compressed.mp4" + ) + + if not video_embed: + logging.warning( + "⚠️ Tweet contains video, but video could not be " + "posted. Skipping photo fallback for this tweet." + ) + + else: + if tweet.media: + for media in tweet.media: + if media.type == "photo": + blob = get_blob_from_url( + media.media_url_https, + bsky_client, + media_http_client, + ) + if blob: + image_embeds.append( + models.AppBskyEmbedImages.Image( + alt=dynamic_alt, + image=blob, + ) + ) + else: + media_upload_failures.append( + f"photo:{media.media_url_https}" + ) + + # --- External link card logic --- + if not video_embed and not image_embeds: + candidate_url = candidate.get( + "resolved_primary_external_url" + ) + + if candidate_url: + if candidate.get("looks_like_title_plus_url"): + logging.info( + f"πŸ”— Detected title+URL post style. " + f"Using resolved URL for external card: " + f"{candidate_url}" + ) + else: + logging.info( + f"πŸ”— Using resolved first external URL for " + f"external card: {candidate_url}" + ) + + external_embed = build_external_link_embed( + candidate_url, + bsky_client, + media_http_client, + fallback_title="Link", + prefetched_metadata=link_meta_for_alt or None, + ) + + if external_embed: + logging.info( + f"βœ… Built external link card for URL: " + f"{candidate_url}" + ) + else: + logging.info( + f"ℹ️ Could not build external link card metadata " + f"for URL: {candidate_url}" + ) + + try: + post_result = None + post_mode = "text" + + if video_embed: + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + embed=video_embed, + langs=bsky_langs, + ) + post_mode = "video" + elif image_embeds: + embed = models.AppBskyEmbedImages.Main( + images=image_embeds + ) + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + embed=embed, + langs=bsky_langs, + ) + post_mode = f"images:{len(image_embeds)}" + elif external_embed: + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + embed=external_embed, + langs=bsky_langs, + ) + post_mode = "external_link_card" + else: + post_result = send_post_with_retry( + bsky_client, + text=rich_text, + langs=bsky_langs, + ) + post_mode = "text_only" + + bsky_uri = getattr(post_result, "uri", None) + + remember_posted_tweet( + state, candidate, bsky_uri=bsky_uri + ) + state = prune_state(state, max_entries=5000) + save_state(state, STATE_PATH) + + recent_bsky_posts.insert( + 0, + { + "uri": bsky_uri, + "text": raw_text, + "normalized_text": candidate["normalized_text"], + "canonical_non_x_urls": candidate[ + "canonical_non_x_urls" + ], + "media_fingerprint": candidate["media_fingerprint"], + "text_media_key": candidate["text_media_key"], + "created_at": arrow.utcnow().isoformat(), + }, + ) + recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] + + new_posts += 1 + + if media_upload_failures: + logging.warning( + f"βœ… Posted tweet to Bluesky with degraded media " + f"mode ({post_mode}). " + f"Failed media items: {media_upload_failures}" + ) + else: + logging.info( + f"βœ… Posted new tweet to Bluesky with mode " + f"{post_mode}: {raw_text}" + ) + + time.sleep(5) + + except Exception as e: + logging.error( + f"❌ Failed to post tweet to Bluesky: {e}" + ) + + browser.close() + + logging.info( + f"βœ… Sync complete. Posted {new_posts} new updates." + ) + + except Exception as e: + logging.error(f"❌ Error during sync cycle: {e}") -# ============================================================ -# CLI -# ============================================================ def main(): - setup_logging() + load_dotenv() + + parser = argparse.ArgumentParser(description="Twitter to Bluesky Sync") + parser.add_argument( + "--twitter-username", + help="Your Twitter login username", + ) + parser.add_argument( + "--twitter-password", + # NOTE (FIX #15): passwords passed via CLI are visible in `ps aux`. + # Prefer setting TWITTER_PASSWORD in your .env file instead. + help="Your Twitter login password", + ) + parser.add_argument( + "--twitter-email", + help="Your Twitter email for security challenges", + ) + parser.add_argument( + "--twitter-handle", + help="The Twitter account to scrape", + ) + parser.add_argument( + "--bsky-handle", + help="Your Bluesky handle", + ) + parser.add_argument( + "--bsky-password", + # NOTE (FIX #15): same warning as --twitter-password above. + # Prefer setting BSKY_APP_PASSWORD in your .env file instead. + help="Your Bluesky app password", + ) + parser.add_argument( + "--bsky-base-url", + help="Bluesky/ATProto PDS base URL, e.g. https://eurosky.social", + ) + parser.add_argument( + "--bsky-langs", + help="Comma-separated language codes for Bluesky posts (default: ca)", + default=None, + ) + parser.add_argument( + "--dry-run", + action="store_true", + default=False, + help=( + "Simulate sync without posting to Bluesky. " + "Logs what would be posted." + ), + ) - parser = argparse.ArgumentParser(description="Post RSS to Bluesky with shared cooldown tracking.") - parser.add_argument("rss_feed", help="RSS feed URL") - parser.add_argument("bsky_handle", help="Bluesky handle") - parser.add_argument("bsky_username", help="Bluesky username") - parser.add_argument("bsky_app_password", help="Bluesky app password") - parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL") - parser.add_argument("--lang", default="ca", help="Language code for the post") - parser.add_argument("--state-path", default=DEFAULT_STATE_PATH, help="Path to local JSON state file") - parser.add_argument("--cooldown-path", default=DEFAULT_COOLDOWN_STATE_PATH, help="Path to shared cooldown JSON state file") args = parser.parse_args() - cfg = AppConfig() - - run_once( - rss_feed=args.rss_feed, - bsky_handle=args.bsky_handle, - bsky_username=args.bsky_username, - bsky_password=args.bsky_app_password, - service_url=args.service, - post_lang=args.lang, - state_path=args.state_path, - cooldown_path=args.cooldown_path, - cfg=cfg + # Resolve credentials: CLI args take priority, then env vars. + # FIX #15 β€” env vars are the secure path; CLI args expose secrets in + # the process list. Operators should prefer .env / environment variables. + args.twitter_username = args.twitter_username or os.getenv( + "TWITTER_USERNAME" ) + args.twitter_password = args.twitter_password or os.getenv( + "TWITTER_PASSWORD" + ) + args.twitter_email = args.twitter_email or os.getenv("TWITTER_EMAIL") + args.bsky_handle = args.bsky_handle or os.getenv("BSKY_HANDLE") + args.bsky_password = args.bsky_password or os.getenv("BSKY_APP_PASSWORD") + args.twitter_handle = ( + args.twitter_handle + or os.getenv("TWITTER_HANDLE") + or args.twitter_username + ) + args.bsky_base_url = ( + args.bsky_base_url + if args.bsky_base_url + else DEFAULT_BSKY_BASE_URL + ) + + # --- Language handling: CLI > env > default (Catalan) --- + raw_langs = args.bsky_langs or os.getenv("BSKY_LANGS") + if raw_langs: + args.bsky_langs = [ + lang.strip() + for lang in raw_langs.split(",") + if lang.strip() + ] + logging.info( + f"🌍 Using configured Bluesky languages: {args.bsky_langs}" + ) + else: + args.bsky_langs = DEFAULT_BSKY_LANGS + logging.info( + f"🌍 Using default Bluesky languages: {args.bsky_langs}" + ) + + missing_args = [] + if not args.twitter_username: + missing_args.append("--twitter-username / TWITTER_USERNAME") + if not args.twitter_password: + missing_args.append("--twitter-password / TWITTER_PASSWORD") + if not args.bsky_handle: + missing_args.append("--bsky-handle / BSKY_HANDLE") + if not args.bsky_password: + missing_args.append("--bsky-password / BSKY_APP_PASSWORD") + + if missing_args: + logging.error( + f"❌ Missing credentials! You forgot to provide: " + f"{', '.join(missing_args)}" + ) + return + + logging.info(f"πŸ€– Bot started. Will check @{args.twitter_handle}") + logging.info( + f"🌍 Posting destination base URL: {args.bsky_base_url}" + ) + + if args.dry_run: + logging.info( + "πŸ§ͺ DRY RUN MODE ENABLED β€” no posts will be created." + ) + + reset_caches() + sync_feeds(args) + logging.info("πŸ€– Bot finished.") if __name__ == "__main__": - main() + main() \ No newline at end of file