diff --git a/rss2bsky.py b/rss2bsky.py index 25bf942..145a60e 100644 --- a/rss2bsky.py +++ b/rss2bsky.py @@ -12,7 +12,9 @@ import io import json import hashlib import html -from urllib.parse import urlparse +from dataclasses import dataclass +from typing import Optional, List, Set, Dict, Any, Tuple +from urllib.parse import urlparse, urlunparse from atproto import Client, client_utils, models from bs4 import BeautifulSoup @@ -24,253 +26,100 @@ except ImportError: PIL_AVAILABLE = False -# --- Configuration --- +# ============================================================ +# Config +# ============================================================ DEFAULT_STATE_PATH = "rss2bsky_state.json" DEFAULT_COOLDOWN_STATE_PATH = "rss2bsky_cooldowns.json" -DEDUPE_BSKY_LIMIT = 30 -BSKY_TEXT_MAX_LENGTH = 275 -# External thumbnail tuning -EXTERNAL_THUMB_MAX_BYTES = 750 * 1024 -EXTERNAL_THUMB_TARGET_BYTES = 500 * 1024 -EXTERNAL_THUMB_MAX_DIMENSION = 1000 -EXTERNAL_THUMB_MIN_JPEG_QUALITY = 35 - -BSKY_BLOB_UPLOAD_MAX_RETRIES = 3 -BSKY_BLOB_UPLOAD_BASE_DELAY = 8 -BSKY_BLOB_UPLOAD_MAX_DELAY = 120 -BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 2 -BSKY_BLOB_TRANSIENT_ERROR_DELAY = 10 - -HTTP_TIMEOUT = 20 -POST_RETRY_DELAY_SECONDS = 2 - -DEFAULT_POST_COOLDOWN_SECONDS = 3600 -DEFAULT_THUMB_COOLDOWN_SECONDS = 1800 - - -# --- Logging --- -logging.basicConfig( - format="%(asctime)s %(message)s", - level=logging.INFO, - stream=sys.stdout -) - -if not PIL_AVAILABLE: - logging.warning("๐ŸŸก Pillow is not installed. External card thumbnail compression is disabled.") - - -# --- Cooldown persistence --- -def default_cooldown_state(): - return { - "version": 1, - "post_creation_cooldown_until": 0, - "thumb_upload_cooldown_until": 0, - "updated_at": None, - } - - -def load_cooldown_state(path): - if not os.path.exists(path): - return default_cooldown_state() - - try: - with open(path, "r", encoding="utf-8") as f: - state = json.load(f) - - if not isinstance(state, dict): - return default_cooldown_state() - - state.setdefault("version", 1) - state.setdefault("post_creation_cooldown_until", 0) - state.setdefault("thumb_upload_cooldown_until", 0) - state.setdefault("updated_at", None) - return state - except Exception as e: - logging.warning(f"โš ๏ธ Could not load cooldown state {path}: {e}") - return default_cooldown_state() - - -def save_cooldown_state(state, path): - try: - state["updated_at"] = arrow.utcnow().isoformat() - temp_path = f"{path}.tmp" - - with open(temp_path, "w", encoding="utf-8") as f: - json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) - - os.replace(temp_path, path) - except Exception as e: - logging.warning(f"โš ๏ธ Could not save cooldown state {path}: {e}") - - -def get_global_post_cooldown_until(cooldown_path): - state = load_cooldown_state(cooldown_path) - return int(state.get("post_creation_cooldown_until", 0) or 0) - - -def get_global_thumb_cooldown_until(cooldown_path): - state = load_cooldown_state(cooldown_path) - return int(state.get("thumb_upload_cooldown_until", 0) or 0) - - -def is_global_post_cooldown_active(cooldown_path): - return int(time.time()) < get_global_post_cooldown_until(cooldown_path) - - -def is_global_thumb_cooldown_active(cooldown_path): - return int(time.time()) < get_global_thumb_cooldown_until(cooldown_path) - - -def set_global_post_cooldown_until(reset_ts, cooldown_path): - state = load_cooldown_state(cooldown_path) - current = int(state.get("post_creation_cooldown_until", 0) or 0) - - if reset_ts > current: - state["post_creation_cooldown_until"] = int(reset_ts) - save_cooldown_state(state, cooldown_path) - - final_ts = int(load_cooldown_state(cooldown_path).get("post_creation_cooldown_until", 0) or 0) - return final_ts - - -def set_global_thumb_cooldown_until(reset_ts, cooldown_path): - state = load_cooldown_state(cooldown_path) - current = int(state.get("thumb_upload_cooldown_until", 0) or 0) - - if reset_ts > current: - state["thumb_upload_cooldown_until"] = int(reset_ts) - save_cooldown_state(state, cooldown_path) - - final_ts = int(load_cooldown_state(cooldown_path).get("thumb_upload_cooldown_until", 0) or 0) - return final_ts - - -# --- Encoding / text helpers --- -def fix_encoding(text): - try: - return text.encode("latin-1").decode("utf-8") - except (UnicodeEncodeError, UnicodeDecodeError): - return text - - -def desescapar_unicode(text): - try: - return html.unescape(text) - except Exception as e: - logging.warning(f"โš ๏ธ Error unescaping unicode/html entities: {e}") - return text - - -def is_html(text): - return bool(re.search(r'<.*?>', text or "")) - - -def strip_trailing_url_punctuation(url): - if not url: - return url - return re.sub(r"[\sโ€ฆ\.,;:!?)\]\"']+$", "", url.strip()) - - -def canonicalize_url(url): - if not url: - return None - url = html.unescape(url.strip()) - return strip_trailing_url_punctuation(url) - - -def clean_whitespace(text): - if not text: - return "" - text = text.replace("\r", "\n") - lines = [line.strip() for line in text.splitlines()] - text = "\n".join(lines) - text = re.sub(r"\n{3,}", "\n\n", text) - return text.strip() - - -def normalize_text(text): - text = clean_whitespace(text) - text = re.sub(r"\s+", " ", text).strip().lower() - return text - - -def process_title(title): - try: - if is_html(title): - title_text = BeautifulSoup(title, "html.parser").get_text().strip() - else: - title_text = (title or "").strip() - - title_text = desescapar_unicode(title_text) - title_text = fix_encoding(title_text) - title_text = clean_whitespace(title_text) - return title_text - except Exception as e: - logging.warning(f"โš ๏ธ Error processing title: {e}") - return title or "" - - -def build_post_text_variants(title_text, link): - title_text = clean_whitespace(title_text) - link = canonicalize_url(link) or link or "" - - variants = [] - seen = set() - - def add_variant(text): - cleaned = clean_whitespace(text) - if cleaned and cleaned not in seen: - seen.add(cleaned) - variants.append(cleaned) - - if title_text and link: - add_variant(f"{title_text}\n\n{link}") - - if title_text: - add_variant(title_text) - - if link and not title_text: - add_variant(link) - - return variants - - -# --- URL / duplicate helpers --- -def is_x_or_twitter_domain(url): - try: - hostname = (urlparse(url).hostname or "").lower() - return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com", "t.co"} - except Exception: - return False - - -def extract_urls_from_text(text): - if not text: - return [] - return re.findall(r"https?://[^\s]+", text) - - -def extract_non_x_urls_from_text(text): - urls = extract_urls_from_text(text) - result = [] - - for url in urls: - cleaned = strip_trailing_url_punctuation(url) - if cleaned and not is_x_or_twitter_domain(cleaned): - result.append(cleaned) - - return result - - -def build_entry_fingerprint(normalized_title, canonical_link): - raw = f"{normalized_title}||{canonical_link or ''}" - return hashlib.sha256(raw.encode("utf-8")).hexdigest() - - -# --- Bluesky state helpers --- -def default_state(): +@dataclass(frozen=True) +class LimitsConfig: + dedupe_bsky_limit: int = 30 + bsky_text_max_length: int = 275 + + external_thumb_max_bytes: int = 750 * 1024 + external_thumb_target_bytes: int = 500 * 1024 + external_thumb_max_dimension: int = 1000 + external_thumb_min_jpeg_quality: int = 35 + + state_max_entries: int = 5000 + + +@dataclass(frozen=True) +class RetryConfig: + blob_upload_max_retries: int = 3 + blob_upload_base_delay: int = 8 + blob_upload_max_delay: int = 120 + blob_transient_error_retries: int = 2 + blob_transient_error_delay: int = 10 + post_retry_delay_seconds: int = 2 + + +@dataclass(frozen=True) +class CooldownConfig: + default_post_cooldown_seconds: int = 3600 + default_thumb_cooldown_seconds: int = 1800 + + +@dataclass(frozen=True) +class NetworkConfig: + http_timeout: int = 20 + + +@dataclass(frozen=True) +class AppConfig: + limits: LimitsConfig = LimitsConfig() + retry: RetryConfig = RetryConfig() + cooldown: CooldownConfig = CooldownConfig() + network: NetworkConfig = NetworkConfig() + + +# ============================================================ +# Local models +# ============================================================ +@dataclass +class EntryCandidate: + item: Any + title_text: str + normalized_title: str + canonical_link: Optional[str] + published_at: Optional[str] + published_arrow: Any + entry_fingerprint: str + post_text_variants: List[str] + + +@dataclass +class RecentBskyPost: + uri: Optional[str] + text: str + normalized_text: str + canonical_non_x_urls: Set[str] + created_at: Optional[str] + + +@dataclass +class RunResult: + published_count: int + stopped_reason: Optional[str] = None + + +# ============================================================ +# Logging +# ============================================================ +def setup_logging() -> None: + logging.basicConfig( + format="%(asctime)s %(message)s", + level=logging.INFO, + stream=sys.stdout + ) + + +# ============================================================ +# State + cooldown +# ============================================================ +def default_state() -> Dict[str, Any]: return { "version": 1, "posted_entries": {}, @@ -279,7 +128,7 @@ def default_state(): } -def load_state(state_path): +def load_state(state_path: str) -> Dict[str, Any]: if not os.path.exists(state_path): logging.info(f"๐Ÿง  No state file found at {state_path}. Starting with empty state.") return default_state() @@ -303,7 +152,7 @@ def load_state(state_path): return default_state() -def save_state(state, state_path): +def save_state(state: Dict[str, Any], state_path: str) -> None: try: state["updated_at"] = arrow.utcnow().isoformat() temp_path = f"{state_path}.tmp" @@ -318,7 +167,7 @@ def save_state(state, state_path): logging.error(f"โŒ Failed to save state file {state_path}: {e}") -def prune_state(state, max_entries=5000): +def prune_state(state: Dict[str, Any], max_entries: int = 5000) -> Dict[str, Any]: posted_entries = state.get("posted_entries", {}) if len(posted_entries) <= max_entries: @@ -332,33 +181,25 @@ def prune_state(state, max_entries=5000): sortable.sort(key=lambda x: x[1], reverse=True) keep_keys = {key for key, _ in sortable[:max_entries]} - new_posted_entries = {} - for key, record in posted_entries.items(): - if key in keep_keys: - new_posted_entries[key] = record - - new_posted_by_bsky_uri = {} - for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items(): - if key in keep_keys: - new_posted_by_bsky_uri[bsky_uri] = key - - state["posted_entries"] = new_posted_entries - state["posted_by_bsky_uri"] = new_posted_by_bsky_uri + state["posted_entries"] = {k: v for k, v in posted_entries.items() if k in keep_keys} + state["posted_by_bsky_uri"] = { + uri: key for uri, key in state.get("posted_by_bsky_uri", {}).items() if key in keep_keys + } return state -def remember_posted_entry(state, candidate, posted_text, bsky_uri=None): - canonical_link = candidate.get("canonical_link") - fallback_key = f"fp:{candidate['entry_fingerprint']}" +def remember_posted_entry(state: Dict[str, Any], candidate: EntryCandidate, posted_text: str, bsky_uri: Optional[str] = None) -> None: + canonical_link = candidate.canonical_link + fallback_key = f"fp:{candidate.entry_fingerprint}" state_key = canonical_link or fallback_key record = { "canonical_link": canonical_link, - "title_text": candidate["title_text"], - "normalized_title": candidate["normalized_title"], - "entry_fingerprint": candidate["entry_fingerprint"], + "title_text": candidate.title_text, + "normalized_title": candidate.normalized_title, + "entry_fingerprint": candidate.entry_fingerprint, "post_text": posted_text, - "published_at": candidate.get("published_at"), + "published_at": candidate.published_at, "bsky_uri": bsky_uri, "posted_at": arrow.utcnow().isoformat(), } @@ -369,11 +210,10 @@ def remember_posted_entry(state, candidate, posted_text, bsky_uri=None): state["posted_by_bsky_uri"][bsky_uri] = state_key -def candidate_matches_state(candidate, state): - canonical_link = candidate["canonical_link"] - entry_fingerprint = candidate["entry_fingerprint"] - normalized_title = candidate["normalized_title"] - +def candidate_matches_state(candidate: EntryCandidate, state: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + canonical_link = candidate.canonical_link + entry_fingerprint = candidate.entry_fingerprint + normalized_title = candidate.normalized_title posted_entries = state.get("posted_entries", {}) if canonical_link and canonical_link in posted_entries: @@ -391,90 +231,223 @@ def candidate_matches_state(candidate, state): return False, None -# --- Bluesky recent-post dedupe --- -def extract_urls_from_facets(record): - urls = [] +def default_cooldown_state() -> Dict[str, Any]: + return { + "version": 1, + "post_creation_cooldown_until": 0, + "thumb_upload_cooldown_until": 0, + "updated_at": None, + } + + +def load_cooldown_state(path: str) -> Dict[str, Any]: + if not os.path.exists(path): + return default_cooldown_state() try: - facets = getattr(record, "facets", None) or [] - for facet in facets: - features = getattr(facet, "features", None) or [] - for feature in features: - uri = getattr(feature, "uri", None) - if uri: - urls.append(uri) + with open(path, "r", encoding="utf-8") as f: + state = json.load(f) + + if not isinstance(state, dict): + return default_cooldown_state() + + state.setdefault("version", 1) + state.setdefault("post_creation_cooldown_until", 0) + state.setdefault("thumb_upload_cooldown_until", 0) + state.setdefault("updated_at", None) + return state except Exception as e: - logging.debug(f"Could not extract facet URLs: {e}") - - return urls + logging.warning(f"โš ๏ธ Could not load cooldown state {path}: {e}") + return default_cooldown_state() -def get_recent_bsky_posts(client, handle, limit=DEDUPE_BSKY_LIMIT): - recent_posts = [] - +def save_cooldown_state(state: Dict[str, Any], path: str) -> None: try: - timeline = client.get_author_feed(handle, limit=limit) - - for item in timeline.feed: - try: - if item.reason is not None: - continue - - record = item.post.record - if getattr(record, "reply", None) is not None: - continue - - text = getattr(record, "text", "") or "" - normalized_text = normalize_text(text) - - urls = [] - urls.extend(extract_non_x_urls_from_text(text)) - urls.extend(extract_urls_from_facets(record)) - - canonical_non_x_urls = set() - for url in urls: - if not is_x_or_twitter_domain(url): - canonical = canonicalize_url(url) - if canonical: - canonical_non_x_urls.add(canonical) - - recent_posts.append({ - "uri": getattr(item.post, "uri", None), - "text": text, - "normalized_text": normalized_text, - "canonical_non_x_urls": canonical_non_x_urls, - "created_at": getattr(record, "created_at", None), - }) - - except Exception as e: - logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") - + state["updated_at"] = arrow.utcnow().isoformat() + temp_path = f"{path}.tmp" + with open(temp_path, "w", encoding="utf-8") as f: + json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) + os.replace(temp_path, path) except Exception as e: - logging.warning(f"โš ๏ธ Could not fetch recent Bluesky posts for duplicate detection: {e}") - - return recent_posts + logging.warning(f"โš ๏ธ Could not save cooldown state {path}: {e}") -def candidate_matches_existing_bsky(candidate, recent_bsky_posts): - candidate_link = candidate["canonical_link"] - candidate_title_normalized = candidate["normalized_title"] - - for existing in recent_bsky_posts: - existing_urls = existing["canonical_non_x_urls"] - existing_text_normalized = existing["normalized_text"] - - if candidate_link and candidate_link in existing_urls: - return True, "bsky:canonical_link" - - if candidate_title_normalized and candidate_title_normalized in existing_text_normalized: - if not candidate_link or candidate_link in existing_urls: - return True, "bsky:title_plus_link" - - return False, None +def get_global_post_cooldown_until(cooldown_path: str) -> int: + state = load_cooldown_state(cooldown_path) + return int(state.get("post_creation_cooldown_until", 0) or 0) -# --- Rich text builder --- -def make_rich(content): +def get_global_thumb_cooldown_until(cooldown_path: str) -> int: + state = load_cooldown_state(cooldown_path) + return int(state.get("thumb_upload_cooldown_until", 0) or 0) + + +def is_global_post_cooldown_active(cooldown_path: str) -> bool: + return int(time.time()) < get_global_post_cooldown_until(cooldown_path) + + +def is_global_thumb_cooldown_active(cooldown_path: str) -> bool: + return int(time.time()) < get_global_thumb_cooldown_until(cooldown_path) + + +def set_global_post_cooldown_until(reset_ts: int, cooldown_path: str) -> int: + state = load_cooldown_state(cooldown_path) + current = int(state.get("post_creation_cooldown_until", 0) or 0) + if reset_ts > current: + state["post_creation_cooldown_until"] = int(reset_ts) + save_cooldown_state(state, cooldown_path) + return int(load_cooldown_state(cooldown_path).get("post_creation_cooldown_until", 0) or 0) + + +def set_global_thumb_cooldown_until(reset_ts: int, cooldown_path: str) -> int: + state = load_cooldown_state(cooldown_path) + current = int(state.get("thumb_upload_cooldown_until", 0) or 0) + if reset_ts > current: + state["thumb_upload_cooldown_until"] = int(reset_ts) + save_cooldown_state(state, cooldown_path) + return int(load_cooldown_state(cooldown_path).get("thumb_upload_cooldown_until", 0) or 0) + + +def format_cooldown_until(ts: int) -> str: + return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts)) + + +def check_post_cooldown_or_log(cooldown_path: str) -> bool: + if is_global_post_cooldown_active(cooldown_path): + reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) + logging.warning(f"๐ŸŸก === BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") + return True + return False + + +def check_thumb_cooldown_or_log(cooldown_path: str) -> bool: + if is_global_thumb_cooldown_active(cooldown_path): + reset_str = format_cooldown_until(get_global_thumb_cooldown_until(cooldown_path)) + logging.info(f"๐Ÿ–ผ๏ธ Skipping external thumbnail upload due to active cooldown until {reset_str}") + return True + return False + + +# ============================================================ +# Text + URL utils +# ============================================================ +def fix_encoding(text: str) -> str: + try: + return text.encode("latin-1").decode("utf-8") + except (UnicodeEncodeError, UnicodeDecodeError): + return text + + +def desescapar_unicode(text: str) -> str: + try: + return html.unescape(text) + except Exception: + return text + + +def is_html(text: str) -> bool: + return bool(re.search(r'<.*?>', text or "")) + + +def strip_trailing_url_punctuation(url: str) -> str: + if not url: + return url + return re.sub(r"[\sโ€ฆ\.,;:!?)\]\"']+$", "", url.strip()) + + +def canonicalize_url(url: str): + if not url: + return None + url = html.unescape(url.strip()) + url = strip_trailing_url_punctuation(url) + try: + parsed = urlparse(url) + parsed = parsed._replace(fragment="") + return urlunparse(parsed) + except Exception: + return url + + +def clean_whitespace(text: str) -> str: + if not text: + return "" + text = text.replace("\r", "\n") + lines = [line.strip() for line in text.splitlines()] + text = "\n".join(lines) + text = re.sub(r"\n{3,}", "\n\n", text) + return text.strip() + + +def normalize_text(text: str) -> str: + text = clean_whitespace(text) + text = re.sub(r"\s+", " ", text).strip().lower() + return text + + +def process_title(title: str) -> str: + if is_html(title): + title_text = BeautifulSoup(title, "html.parser").get_text().strip() + else: + title_text = (title or "").strip() + title_text = desescapar_unicode(title_text) + title_text = fix_encoding(title_text) + title_text = clean_whitespace(title_text) + return title_text + + +def build_post_text_variants(title_text: str, link: str): + title_text = clean_whitespace(title_text) + link = canonicalize_url(link) or link or "" + + variants = [] + seen = set() + + def add_variant(text: str): + cleaned = clean_whitespace(text) + if cleaned and cleaned not in seen: + seen.add(cleaned) + variants.append(cleaned) + + if title_text and link: + add_variant(f"{title_text}\n\n{link}") + if title_text: + add_variant(title_text) + if link and not title_text: + add_variant(link) + + return variants + + +def is_x_or_twitter_domain(url: str) -> bool: + try: + hostname = (urlparse(url).hostname or "").lower() + return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com", "t.co"} + except Exception: + return False + + +def extract_urls_from_text(text: str): + if not text: + return [] + return re.findall(r"https?://[^\s]+", text) + + +def extract_non_x_urls_from_text(text: str): + urls = extract_urls_from_text(text) + result = [] + for url in urls: + cleaned = strip_trailing_url_punctuation(url) + if cleaned and not is_x_or_twitter_domain(cleaned): + result.append(cleaned) + return result + + +def build_entry_fingerprint(normalized_title: str, canonical_link: str) -> str: + raw = f"{normalized_title}||{canonical_link or ''}" + return hashlib.sha256(raw.encode("utf-8")).hexdigest() + + +def make_rich(content: str): text_builder = client_utils.TextBuilder() content = clean_whitespace(content) lines = content.splitlines() @@ -509,7 +482,6 @@ def make_rich(content): text_builder.text(trailing) else: text_builder.text(word) - else: text_builder.text(word) @@ -522,7 +494,9 @@ def make_rich(content): return text_builder -# --- Error / cooldown helpers --- +# ============================================================ +# Error helpers +# ============================================================ def get_rate_limit_reset_timestamp(error_obj): try: headers = getattr(error_obj, "headers", None) @@ -551,7 +525,7 @@ def get_rate_limit_reset_timestamp(error_obj): return None -def is_rate_limited_error(error_obj): +def is_rate_limited_error(error_obj) -> bool: error_text = str(error_obj) repr_text = repr(error_obj) return ( @@ -562,73 +536,146 @@ def is_rate_limited_error(error_obj): ) -def is_transient_blob_error(error_obj): +def is_transient_blob_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ - "InvokeTimeoutError", - "ReadTimeout", - "WriteTimeout", - "TimeoutException", - "RemoteProtocolError", - "ConnectError", - "503", - "502", - "504", + "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", + "RemoteProtocolError", "ConnectError", "503", "502", "504" ] return any(signal in error_text for signal in transient_signals) -def is_timeout_error(error_obj): +def is_timeout_error(error_obj) -> bool: text = repr(error_obj) - return any(signal in text for signal in [ - "InvokeTimeoutError", - "ReadTimeout", - "WriteTimeout", - "TimeoutException", - ]) + return any(signal in text for signal in ["InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException"]) -def activate_post_creation_cooldown_from_error(error_obj, cooldown_path): +def is_probable_length_error(exc) -> bool: + text = repr(exc) + signals = [ + "TextTooLong", "text too long", "Invalid app.bsky.feed.post record", + "string too long", "maxLength", "length", "grapheme too big" + ] + return any(signal.lower() in text.lower() for signal in signals) + + +def activate_post_creation_cooldown_from_error(error_obj, cooldown_path: str, cfg: AppConfig) -> int: reset_ts = get_rate_limit_reset_timestamp(error_obj) if not reset_ts: - reset_ts = int(time.time()) + DEFAULT_POST_COOLDOWN_SECONDS - + reset_ts = int(time.time()) + cfg.cooldown.default_post_cooldown_seconds final_ts = set_global_post_cooldown_until(reset_ts, cooldown_path) - logging.error( - f"๐Ÿ›‘ === BSKY POST STOPPED: RATE LIMITED === Posting disabled until " - f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(final_ts))}" - ) + logging.error(f"๐Ÿ›‘ === BSKY POST STOPPED: RATE LIMITED === Posting disabled until {format_cooldown_until(final_ts)}") return final_ts -def activate_thumb_upload_cooldown_from_error(error_obj, cooldown_path): +def activate_thumb_upload_cooldown_from_error(error_obj, cooldown_path: str, cfg: AppConfig) -> int: reset_ts = get_rate_limit_reset_timestamp(error_obj) if not reset_ts: - reset_ts = int(time.time()) + DEFAULT_THUMB_COOLDOWN_SECONDS - + reset_ts = int(time.time()) + cfg.cooldown.default_thumb_cooldown_seconds final_ts = set_global_thumb_cooldown_until(reset_ts, cooldown_path) - logging.warning( - f"๐Ÿ–ผ๏ธ Thumbnail uploads disabled until " - f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(final_ts))}." - ) + logging.warning(f"๐Ÿ–ผ๏ธ Thumbnail uploads disabled until {format_cooldown_until(final_ts)}.") return final_ts -# --- Blob / image upload helpers --- -def get_rate_limit_wait_seconds(error_obj, default_delay): +def get_rate_limit_wait_seconds(error_obj, default_delay: int, cfg: AppConfig) -> int: reset_ts = get_rate_limit_reset_timestamp(error_obj) if reset_ts: now_ts = int(time.time()) wait_seconds = max(reset_ts - now_ts + 1, default_delay) - return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) + return min(wait_seconds, cfg.retry.blob_upload_max_delay) return default_delay -def upload_blob_with_retry(client, binary_data, media_label="media", optional=False, cooldown_on_rate_limit=False, cooldown_path=DEFAULT_COOLDOWN_STATE_PATH): +# ============================================================ +# Bluesky helpers +# ============================================================ +def extract_urls_from_facets(record) -> List[str]: + urls = [] + try: + facets = getattr(record, "facets", None) or [] + for facet in facets: + features = getattr(facet, "features", None) or [] + for feature in features: + uri = getattr(feature, "uri", None) + if uri: + urls.append(uri) + except Exception as e: + logging.debug(f"Could not extract facet URLs: {e}") + return urls + + +def get_recent_bsky_posts(client: Client, handle: str, limit: int) -> List[RecentBskyPost]: + recent_posts: List[RecentBskyPost] = [] + try: + timeline = client.get_author_feed(handle, limit=limit) + + for item in timeline.feed: + try: + if item.reason is not None: + continue + + record = item.post.record + if getattr(record, "reply", None) is not None: + continue + + text = getattr(record, "text", "") or "" + normalized = normalize_text(text) + + urls = [] + urls.extend(extract_non_x_urls_from_text(text)) + urls.extend(extract_urls_from_facets(record)) + + canonical_non_x_urls: Set[str] = set() + for url in urls: + if not is_x_or_twitter_domain(url): + c = canonicalize_url(url) + if c: + canonical_non_x_urls.add(c) + + recent_posts.append(RecentBskyPost( + uri=getattr(item.post, "uri", None), + text=text, + normalized_text=normalized, + canonical_non_x_urls=canonical_non_x_urls, + created_at=getattr(record, "created_at", None), + )) + + except Exception as e: + logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") + except Exception as e: + logging.warning(f"โš ๏ธ Could not fetch recent Bluesky posts for duplicate detection: {e}") + + return recent_posts + + +def candidate_matches_existing_bsky(candidate: EntryCandidate, recent_bsky_posts: List[RecentBskyPost]) -> Tuple[bool, Optional[str]]: + candidate_link = candidate.canonical_link + candidate_title_normalized = candidate.normalized_title + + for existing in recent_bsky_posts: + if candidate_link and candidate_link in existing.canonical_non_x_urls: + return True, "bsky:canonical_link" + + if candidate_title_normalized and candidate_title_normalized in existing.normalized_text: + if not candidate_link or candidate_link in existing.canonical_non_x_urls: + return True, "bsky:title_plus_link" + + return False, None + + +def upload_blob_with_retry( + client: Client, + binary_data: bytes, + cfg: AppConfig, + media_label: str = "media", + optional: bool = False, + cooldown_on_rate_limit: bool = False, + cooldown_path: Optional[str] = None, +): last_exception = None transient_attempts = 0 - for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): + for attempt in range(1, cfg.retry.blob_upload_max_retries + 1): try: result = client.upload_blob(binary_data) return result.blob @@ -637,8 +684,8 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa last_exception = e if is_rate_limited_error(e): - if cooldown_on_rate_limit: - activate_thumb_upload_cooldown_from_error(e, cooldown_path) + if cooldown_on_rate_limit and cooldown_path: + activate_thumb_upload_cooldown_from_error(e, cooldown_path, cfg) if optional and cooldown_on_rate_limit: logging.warning( @@ -648,15 +695,15 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa return None backoff_delay = min( - BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), - BSKY_BLOB_UPLOAD_MAX_DELAY + cfg.retry.blob_upload_base_delay * (2 ** (attempt - 1)), + cfg.retry.blob_upload_max_delay ) - wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) + wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay, cfg) - if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: + if attempt < cfg.retry.blob_upload_max_retries: logging.warning( f"โณ Blob upload rate-limited for {media_label}. " - f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." + f"Retry {attempt}/{cfg.retry.blob_upload_max_retries} after {wait_seconds}s." ) time.sleep(wait_seconds) continue @@ -664,12 +711,12 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa logging.warning(f"โš ๏ธ Exhausted blob upload retries for {media_label}: {repr(e)}") break - if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: + if is_transient_blob_error(e) and transient_attempts < cfg.retry.blob_transient_error_retries: transient_attempts += 1 - wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts + wait_seconds = cfg.retry.blob_transient_error_delay * transient_attempts logging.warning( f"โณ Transient blob upload failure for {media_label}: {repr(e)}. " - f"Retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s." + f"Retry {transient_attempts}/{cfg.retry.blob_transient_error_retries} after {wait_seconds}s." ) time.sleep(wait_seconds) continue @@ -681,7 +728,47 @@ def upload_blob_with_retry(client, binary_data, media_label="media", optional=Fa return None -def compress_external_thumb_to_limit(image_bytes, target_bytes=EXTERNAL_THUMB_TARGET_BYTES, hard_max_bytes=EXTERNAL_THUMB_MAX_BYTES): +def try_send_post_with_variants(client: Client, text_variants: List[str], embed, post_lang: str, cooldown_path: str, cfg: AppConfig): + if is_global_post_cooldown_active(cooldown_path): + reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) + raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") + + last_exception = None + + for idx, variant in enumerate(text_variants, start=1): + try: + if is_global_post_cooldown_active(cooldown_path): + reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) + raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") + + logging.info(f"๐Ÿ“ Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})") + rich_text = make_rich(variant) + result = client.send_post(text=rich_text, embed=embed, langs=[post_lang]) + return result, variant + + except Exception as e: + last_exception = e + logging.warning(f"โš ๏ธ Post variant {idx} failed: {repr(e)}") + + if is_rate_limited_error(e): + activate_post_creation_cooldown_from_error(e, cooldown_path, cfg) + raise + + if is_timeout_error(e): + raise + + if not is_probable_length_error(e): + raise + + if last_exception: + raise last_exception + raise RuntimeError("No text variants available to post.") + + +# ============================================================ +# Embeds / metadata / image compression +# ============================================================ +def compress_external_thumb_to_limit(image_bytes: bytes, cfg: AppConfig): if not PIL_AVAILABLE: return None @@ -692,32 +779,29 @@ def compress_external_thumb_to_limit(image_bytes, target_bytes=EXTERNAL_THUMB_TA width, height = img.size max_dim = max(width, height) - if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: - scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim + if max_dim > cfg.limits.external_thumb_max_dimension: + scale = cfg.limits.external_thumb_max_dimension / max_dim new_size = (max(1, int(width * scale)), max(1, int(height * scale))) img = img.resize(new_size, Image.LANCZOS) logging.info(f"๐Ÿ–ผ๏ธ Resized external thumb to {new_size[0]}x{new_size[1]}") - for quality in [78, 70, 62, 54, 46, 40, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + best_so_far = None # explicit fix + + for quality in [78, 70, 62, 54, 46, 40, cfg.limits.external_thumb_min_jpeg_quality]: out = io.BytesIO() img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) data = out.getvalue() - logging.info( - f"๐Ÿ–ผ๏ธ External thumb candidate size at JPEG quality {quality}: " - f"{len(data) / 1024:.2f} KB" - ) + logging.info(f"๐Ÿ–ผ๏ธ External thumb candidate size at JPEG quality {quality}: {len(data) / 1024:.2f} KB") - if len(data) <= target_bytes: + if len(data) <= cfg.limits.external_thumb_target_bytes: return data - if len(data) <= hard_max_bytes: + if len(data) <= cfg.limits.external_thumb_max_bytes: best_so_far = data - # Additional downscale passes - best_candidate = locals().get("best_so_far") - if best_candidate and len(best_candidate) <= hard_max_bytes: - return best_candidate + if best_so_far and len(best_so_far) <= cfg.limits.external_thumb_max_bytes: + return best_so_far for target_dim in [900, 800, 700, 600, 500]: resized = img.copy() @@ -729,7 +813,7 @@ def compress_external_thumb_to_limit(image_bytes, target_bytes=EXTERNAL_THUMB_TA new_size = (max(1, int(width * scale)), max(1, int(height * scale))) resized = resized.resize(new_size, Image.LANCZOS) - for quality in [54, 46, 40, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: + for quality in [54, 46, 40, cfg.limits.external_thumb_min_jpeg_quality]: out = io.BytesIO() resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) data = out.getvalue() @@ -739,14 +823,14 @@ def compress_external_thumb_to_limit(image_bytes, target_bytes=EXTERNAL_THUMB_TA f"{len(data) / 1024:.2f} KB" ) - if len(data) <= target_bytes: + if len(data) <= cfg.limits.external_thumb_target_bytes: return data - if len(data) <= hard_max_bytes: - best_candidate = data + if len(data) <= cfg.limits.external_thumb_max_bytes: + best_so_far = data - if best_candidate and len(best_candidate) <= hard_max_bytes: - return best_candidate + if best_so_far and len(best_so_far) <= cfg.limits.external_thumb_max_bytes: + return best_so_far except Exception as e: logging.warning(f"โš ๏ธ Could not compress external thumbnail: {repr(e)}") @@ -754,14 +838,32 @@ def compress_external_thumb_to_limit(image_bytes, target_bytes=EXTERNAL_THUMB_TA return None -def get_external_thumb_blob_from_url(image_url, client, http_client, cooldown_path): - if is_global_thumb_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_thumb_cooldown_until(cooldown_path))) - logging.info(f"๐Ÿ–ผ๏ธ Skipping external thumbnail upload due to active cooldown until {reset_str}") +def fetch_link_metadata(url: str, http_client: httpx.Client, cfg: AppConfig): + try: + r = http_client.get(url, timeout=cfg.network.http_timeout, follow_redirects=True) + r.raise_for_status() + soup = BeautifulSoup(r.text, "html.parser") + + title = (soup.find("meta", property="og:title") or soup.find("title")) + desc = (soup.find("meta", property="og:description") or soup.find("meta", attrs={"name": "description"})) + image = (soup.find("meta", property="og:image") or soup.find("meta", attrs={"name": "twitter:image"})) + + return { + "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), + "description": desc["content"] if desc and desc.has_attr("content") else "", + "image": image["content"] if image and image.has_attr("content") else None, + } + except Exception as e: + logging.warning(f"โš ๏ธ Could not fetch link metadata for {url}: {e}") + return {} + + +def get_external_thumb_blob_from_url(image_url: str, client: Client, http_client: httpx.Client, cooldown_path: str, cfg: AppConfig): + if check_thumb_cooldown_or_log(cooldown_path): return None try: - r = http_client.get(image_url, timeout=HTTP_TIMEOUT, follow_redirects=True) + r = http_client.get(image_url, timeout=cfg.network.http_timeout, follow_redirects=True) if r.status_code != 200: logging.warning(f"โš ๏ธ Could not fetch external thumb {image_url}: HTTP {r.status_code}") return None @@ -773,7 +875,7 @@ def get_external_thumb_blob_from_url(image_url, client, http_client, cooldown_pa logging.info(f"๐Ÿ–ผ๏ธ Downloaded external thumb {image_url} ({len(content) / 1024:.2f} KB)") - upload_bytes = compress_external_thumb_to_limit(content) + upload_bytes = compress_external_thumb_to_limit(content, cfg) if not upload_bytes: logging.warning("โš ๏ธ Could not prepare compressed external thumbnail. Omitting thumbnail.") return None @@ -781,8 +883,9 @@ def get_external_thumb_blob_from_url(image_url, client, http_client, cooldown_pa logging.info(f"๐Ÿ–ผ๏ธ Final external thumb upload size: {len(upload_bytes) / 1024:.2f} KB") blob = upload_blob_with_retry( - client, - upload_bytes, + client=client, + binary_data=upload_bytes, + cfg=cfg, media_label=f"external-thumb:{image_url}", optional=True, cooldown_on_rate_limit=True, @@ -800,39 +903,12 @@ def get_external_thumb_blob_from_url(image_url, client, http_client, cooldown_pa return None -# --- Link metadata --- -def fetch_link_metadata(url, http_client): - try: - r = http_client.get(url, timeout=HTTP_TIMEOUT, follow_redirects=True) - r.raise_for_status() - soup = BeautifulSoup(r.text, "html.parser") - - title = (soup.find("meta", property="og:title") or soup.find("title")) - desc = ( - soup.find("meta", property="og:description") - or soup.find("meta", attrs={"name": "description"}) - ) - image = ( - soup.find("meta", property="og:image") - or soup.find("meta", attrs={"name": "twitter:image"}) - ) - - return { - "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), - "description": desc["content"] if desc and desc.has_attr("content") else "", - "image": image["content"] if image and image.has_attr("content") else None, - } - except Exception as e: - logging.warning(f"โš ๏ธ Could not fetch link metadata for {url}: {e}") - return {} - - -def build_external_link_embed(url, fallback_title, client, http_client, cooldown_path): - link_metadata = fetch_link_metadata(url, http_client) +def build_external_link_embed(url: str, fallback_title: str, client: Client, http_client: httpx.Client, cooldown_path: str, cfg: AppConfig): + link_metadata = fetch_link_metadata(url, http_client, cfg) thumb_blob = None if link_metadata.get("image"): - thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client, cooldown_path) + thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client, cooldown_path, cfg) if thumb_blob: logging.info("โœ… External link card thumbnail prepared successfully") else: @@ -849,30 +925,43 @@ def build_external_link_embed(url, fallback_title, client, http_client, cooldown thumb=thumb_blob, ) ) - return None -# --- Feed parsing helpers --- +# ============================================================ +# Feed helpers +# ============================================================ def parse_entry_time(item): - candidates = [ - getattr(item, "published", None), - getattr(item, "updated", None), - getattr(item, "pubDate", None), - ] - + candidates = [getattr(item, "published", None), getattr(item, "updated", None), getattr(item, "pubDate", None)] for candidate in candidates: if candidate: try: return arrow.get(candidate) except Exception: continue - return None -def build_candidates_from_feed(feed): - candidates = [] +def fetch_feed_content(feed_url: str, http_client: httpx.Client, cfg: AppConfig) -> str: + response = http_client.get(feed_url, timeout=cfg.network.http_timeout, follow_redirects=True) + response.raise_for_status() + + try: + result = charset_normalizer.from_bytes(response.content).best() + if not result or not hasattr(result, "text"): + raise ValueError("Could not detect feed encoding.") + return result.text + except ValueError: + logging.warning("โš ๏ธ Could not detect feed encoding with charset_normalizer. Trying latin-1.") + try: + return response.content.decode("latin-1") + except UnicodeDecodeError: + logging.warning("โš ๏ธ Could not decode with latin-1. Trying utf-8 with ignored errors.") + return response.content.decode("utf-8", errors="ignore") + + +def build_candidates_from_feed(feed) -> List[EntryCandidate]: + candidates: List[EntryCandidate] = [] for item in getattr(feed, "entries", []): try: @@ -887,114 +976,66 @@ def build_candidates_from_feed(feed): normalized_title = normalize_text(title_text) entry_fingerprint = build_entry_fingerprint(normalized_title, link) - candidates.append({ - "item": item, - "title_text": title_text, - "normalized_title": normalized_title, - "canonical_link": link, - "published_at": published_at.isoformat() if published_at else None, - "published_arrow": published_at, - "entry_fingerprint": entry_fingerprint, - "post_text_variants": build_post_text_variants(title_text, link), - }) + candidates.append(EntryCandidate( + item=item, + title_text=title_text, + normalized_title=normalized_title, + canonical_link=link, + published_at=published_at.isoformat() if published_at else None, + published_arrow=published_at, + entry_fingerprint=entry_fingerprint, + post_text_variants=build_post_text_variants(title_text, link), + )) except Exception as e: logging.warning(f"โš ๏ธ Failed to prepare feed entry candidate: {e}") - candidates.sort(key=lambda c: c["published_arrow"] or arrow.get(0)) + candidates.sort(key=lambda c: c.published_arrow or arrow.get(0)) return candidates -# --- Posting helpers --- -def is_probable_length_error(exc): - text = repr(exc) - signals = [ - "TextTooLong", - "text too long", - "Invalid app.bsky.feed.post record", - "string too long", - "maxLength", - "length", - "grapheme too big", - ] - return any(signal.lower() in text.lower() for signal in signals) - - -def try_send_post_with_variants(client, text_variants, embed, post_lang, cooldown_path): - if is_global_post_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) - raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") - - last_exception = None - - for idx, variant in enumerate(text_variants, start=1): - try: - if is_global_post_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) - raise RuntimeError(f"Posting skipped because global post cooldown is active until {reset_str}") - - logging.info(f"๐Ÿ“ Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})") - rich_text = make_rich(variant) - result = client.send_post(text=rich_text, embed=embed, langs=[post_lang]) - return result, variant - - except Exception as e: - last_exception = e - logging.warning(f"โš ๏ธ Post variant {idx} failed: {repr(e)}") - - if is_rate_limited_error(e): - activate_post_creation_cooldown_from_error(e, cooldown_path) - raise - - if is_timeout_error(e): - raise - - if not is_probable_length_error(e): - raise - - if last_exception: - raise last_exception - - raise RuntimeError("No text variants available to post.") - - -# --- Main --- -def main(): - parser = argparse.ArgumentParser(description="Post RSS to Bluesky with shared cooldown tracking.") - parser.add_argument("rss_feed", help="RSS feed URL") - parser.add_argument("bsky_handle", help="Bluesky handle") - parser.add_argument("bsky_username", help="Bluesky username") - parser.add_argument("bsky_app_password", help="Bluesky app password") - parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL") - parser.add_argument("--lang", default="ca", help="Language code for the post") - parser.add_argument("--state-path", default=DEFAULT_STATE_PATH, help="Path to local JSON state file") - parser.add_argument("--cooldown-path", default=DEFAULT_COOLDOWN_STATE_PATH, help="Path to shared cooldown JSON state file") - args = parser.parse_args() - - feed_url = args.rss_feed - bsky_handle = args.bsky_handle - bsky_username = args.bsky_username - bsky_password = args.bsky_app_password - service_url = args.service - post_lang = args.lang - state_path = args.state_path - cooldown_path = args.cooldown_path - - if is_global_post_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) - logging.warning(f"๐ŸŸก === BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") - return - - client = Client(base_url=service_url) - +# ============================================================ +# Orchestration +# ============================================================ +def login_with_backoff(client: Client, bsky_username: str, bsky_password: str, service_url: str): backoff = 60 while True: try: - if is_global_post_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) - logging.warning(f"๐ŸŸก === BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") - return + if check_post_cooldown_or_log(args.cooldown_path): + return False + logging.info(f"๐Ÿ” Attempting login to server: {service_url} with user: {bsky_username}") + client.login(bsky_username, bsky_password) + logging.info(f"โœ… Login successful for user: {bsky_username}") + return True + except Exception: + logging.exception("โŒ Login exception") + time.sleep(backoff) + backoff = min(backoff + 60, 600) + +def run_once( + rss_feed: str, + bsky_handle: str, + bsky_username: str, + bsky_password: str, + service_url: str, + post_lang: str, + state_path: str, + cooldown_path: str, + cfg: AppConfig +) -> RunResult: + if not PIL_AVAILABLE: + logging.warning("๐ŸŸก Pillow is not installed. External card thumbnail compression is disabled.") + + if check_post_cooldown_or_log(cooldown_path): + return RunResult(published_count=0, stopped_reason="global_post_cooldown_active") + + client = Client(base_url=service_url) + backoff = 60 + while True: + try: + if check_post_cooldown_or_log(cooldown_path): + return RunResult(published_count=0, stopped_reason="global_post_cooldown_active") logging.info(f"๐Ÿ” Attempting login to server: {service_url} with user: {bsky_username}") client.login(bsky_username, bsky_password) logging.info(f"โœ… Login successful for user: {bsky_username}") @@ -1005,69 +1046,52 @@ def main(): backoff = min(backoff + 60, 600) state = load_state(state_path) - recent_bsky_posts = get_recent_bsky_posts(client, bsky_handle, limit=DEDUPE_BSKY_LIMIT) + recent_bsky_posts = get_recent_bsky_posts(client, bsky_handle, limit=cfg.limits.dedupe_bsky_limit) logging.info(f"๐Ÿง  Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") logging.info(f"๐Ÿง  Local state currently tracks {len(state.get('posted_entries', {}))} posted items.") - response = httpx.get(feed_url, timeout=HTTP_TIMEOUT, follow_redirects=True) - response.raise_for_status() - - try: - result = charset_normalizer.from_bytes(response.content).best() - if not result or not hasattr(result, "text"): - raise ValueError("Could not detect feed encoding.") - feed_content = result.text - except ValueError: - logging.warning("โš ๏ธ Could not detect feed encoding with charset_normalizer. Trying latin-1.") - try: - feed_content = response.content.decode("latin-1") - except UnicodeDecodeError: - logging.warning("โš ๏ธ Could not decode with latin-1. Trying utf-8 with ignored errors.") - feed_content = response.content.decode("utf-8", errors="ignore") - - feed = fastfeedparser.parse(feed_content) - candidates = build_candidates_from_feed(feed) - - logging.info(f"๐Ÿ“ฐ Prepared {len(candidates)} feed entry candidates for duplicate comparison.") - - entries_to_post = [] - for candidate in candidates: - is_dup_state, reason_state = candidate_matches_state(candidate, state) - if is_dup_state: - logging.info(f"โญ๏ธ Skipping candidate due to local state duplicate match on: {reason_state}") - continue - - is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) - if is_dup_bsky: - logging.info(f"โญ๏ธ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") - continue - - entries_to_post.append(candidate) - - logging.info(f"๐Ÿ“ฌ {len(entries_to_post)} entries remain after duplicate filtering.") - - if not entries_to_post: - logging.info("โ„น๏ธ Execution finished: no new entries to publish.") - return - - if is_global_post_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) - logging.warning(f"๐ŸŸก === BSKY POST SKIPPED: GLOBAL COOLDOWN === Active until {reset_str}") - return - - noves_entrades = 0 - with httpx.Client() as http_client: + feed_content = fetch_feed_content(rss_feed, http_client, cfg) + feed = fastfeedparser.parse(feed_content) + candidates = build_candidates_from_feed(feed) + + logging.info(f"๐Ÿ“ฐ Prepared {len(candidates)} feed entry candidates for duplicate comparison.") + + entries_to_post: List[EntryCandidate] = [] + for candidate in candidates: + is_dup_state, reason_state = candidate_matches_state(candidate, state) + if is_dup_state: + logging.info(f"โญ๏ธ Skipping candidate due to local state duplicate match on: {reason_state}") + continue + + is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) + if is_dup_bsky: + logging.info(f"โญ๏ธ Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") + continue + + entries_to_post.append(candidate) + + logging.info(f"๐Ÿ“ฌ {len(entries_to_post)} entries remain after duplicate filtering.") + + if not entries_to_post: + logging.info("โ„น๏ธ Execution finished: no new entries to publish.") + return RunResult(published_count=0) + + if check_post_cooldown_or_log(cooldown_path): + return RunResult(published_count=0, stopped_reason="global_post_cooldown_active") + + published = 0 + for candidate in entries_to_post: if is_global_post_cooldown_active(cooldown_path): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) + reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) logging.error(f"๐Ÿ›‘ === BSKY POST STOPPED: GLOBAL COOLDOWN === Skipping remaining entries until {reset_str}") break - title_text = candidate["title_text"] - canonical_link = candidate["canonical_link"] - text_variants = candidate["post_text_variants"] + title_text = candidate.title_text + canonical_link = candidate.canonical_link + text_variants = candidate.post_text_variants logging.info(f"๐Ÿ“ฐ Preparing to post RSS entry: {canonical_link or title_text}") logging.info(f"๐Ÿš€ === BSKY POST START === {canonical_link or title_text}") @@ -1079,7 +1103,8 @@ def main(): fallback_title=title_text or "Enllaรง", client=client, http_client=http_client, - cooldown_path=cooldown_path + cooldown_path=cooldown_path, + cfg=cfg ) try: @@ -1088,38 +1113,39 @@ def main(): text_variants=text_variants, embed=embed, post_lang=post_lang, - cooldown_path=cooldown_path + cooldown_path=cooldown_path, + cfg=cfg ) bsky_uri = getattr(post_result, "uri", None) remember_posted_entry(state, candidate, posted_text=posted_text, bsky_uri=bsky_uri) - state = prune_state(state, max_entries=5000) + state = prune_state(state, max_entries=cfg.limits.state_max_entries) save_state(state, state_path) - recent_bsky_posts.insert(0, { - "uri": bsky_uri, - "text": posted_text, - "normalized_text": normalize_text(posted_text), - "canonical_non_x_urls": {canonical_link} if canonical_link else set(), - "created_at": arrow.utcnow().isoformat(), - }) - recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] + recent_bsky_posts.insert(0, RecentBskyPost( + uri=bsky_uri, + text=posted_text, + normalized_text=normalize_text(posted_text), + canonical_non_x_urls={canonical_link} if canonical_link else set(), + created_at=arrow.utcnow().isoformat(), + )) + recent_bsky_posts = recent_bsky_posts[:cfg.limits.dedupe_bsky_limit] - noves_entrades += 1 + published += 1 logging.info(f"โœ… === BSKY POST SUCCESS === {canonical_link or title_text}") logging.info(f"๐ŸŽ‰ Posted RSS entry to Bluesky: {canonical_link or title_text}") - time.sleep(POST_RETRY_DELAY_SECONDS) + time.sleep(cfg.retry.post_retry_delay_seconds) except Exception as e: if is_rate_limited_error(e): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) + reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) logging.error(f"โŒ === BSKY POST FAILED === {canonical_link or title_text}") logging.error(f"๐Ÿ›‘ === BSKY POST STOPPED: RATE LIMITED === Ending publish loop until {reset_str}") break if "global post cooldown is active" in str(e).lower(): - reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(get_global_post_cooldown_until(cooldown_path))) + reset_str = format_cooldown_until(get_global_post_cooldown_until(cooldown_path)) logging.warning(f"๐ŸŸก === BSKY POST SKIPPED: GLOBAL COOLDOWN === {canonical_link or title_text}") logging.warning(f"๐Ÿ›‘ === BSKY POST STOPPED: GLOBAL COOLDOWN === Ending publish loop until {reset_str}") break @@ -1130,11 +1156,45 @@ def main(): logging.exception(f"โŒ === BSKY POST FAILED === {canonical_link or title_text}") - if noves_entrades > 0: - logging.info(f"๐ŸŽ‰ Execution finished: published {noves_entrades} new entries to Bluesky.") + if published > 0: + logging.info(f"๐ŸŽ‰ Execution finished: published {published} new entries to Bluesky.") else: logging.info("โ„น๏ธ Execution finished: no new entries were published.") + return RunResult(published_count=published) + + +# ============================================================ +# CLI +# ============================================================ +def main(): + setup_logging() + + parser = argparse.ArgumentParser(description="Post RSS to Bluesky with shared cooldown tracking.") + parser.add_argument("rss_feed", help="RSS feed URL") + parser.add_argument("bsky_handle", help="Bluesky handle") + parser.add_argument("bsky_username", help="Bluesky username") + parser.add_argument("bsky_app_password", help="Bluesky app password") + parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL") + parser.add_argument("--lang", default="ca", help="Language code for the post") + parser.add_argument("--state-path", default=DEFAULT_STATE_PATH, help="Path to local JSON state file") + parser.add_argument("--cooldown-path", default=DEFAULT_COOLDOWN_STATE_PATH, help="Path to shared cooldown JSON state file") + args = parser.parse_args() + + cfg = AppConfig() + + run_once( + rss_feed=args.rss_feed, + bsky_handle=args.bsky_handle, + bsky_username=args.bsky_username, + bsky_password=args.bsky_app_password, + service_url=args.service, + post_lang=args.lang, + state_path=args.state_path, + cooldown_path=args.cooldown_path, + cfg=cfg + ) + if __name__ == "__main__": main() \ No newline at end of file