import argparse import arrow import fastfeedparser import logging import re import httpx import time import charset_normalizer import sys import os import io import json import hashlib import html from urllib.parse import urlparse from atproto import Client, client_utils, models from bs4 import BeautifulSoup from PIL import Image # --- Configuration --- STATE_PATH = "rss2bsky_state.json" DEDUPE_BSKY_LIMIT = 30 BSKY_TEXT_MAX_LENGTH = 275 EXTERNAL_THUMB_MAX_BYTES = 950 * 1024 EXTERNAL_THUMB_MAX_DIMENSION = 1200 EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40 BSKY_BLOB_UPLOAD_MAX_RETRIES = 5 BSKY_BLOB_UPLOAD_BASE_DELAY = 10 BSKY_BLOB_UPLOAD_MAX_DELAY = 300 BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3 BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15 HTTP_TIMEOUT = 20 # --- Logging --- logging.basicConfig( format="%(asctime)s %(message)s", level=logging.INFO, stream=sys.stdout ) # --- Encoding / text helpers --- def fix_encoding(text): try: return text.encode("latin-1").decode("utf-8") except (UnicodeEncodeError, UnicodeDecodeError): logging.warning(f"Error correcting encoding: {text}") return text def desescapar_unicode(text): try: return html.unescape(text) except Exception as e: logging.warning(f"Error unescaping unicode/html entities: {e}") return text def is_html(text): return bool(re.search(r'<.*?>', text or "")) def strip_trailing_url_punctuation(url): if not url: return url return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url.strip()) def canonicalize_url(url): if not url: return None return strip_trailing_url_punctuation(url.strip()) def clean_whitespace(text): if not text: return "" text = text.replace("\r", "\n") lines = [line.strip() for line in text.splitlines()] text = "\n".join(lines) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def normalize_text(text): text = clean_whitespace(text) text = re.sub(r"\s+", " ", text).strip().lower() return text def process_title(title): try: if is_html(title): title_text = BeautifulSoup(title, "html.parser", from_encoding="utf-8").get_text().strip() else: title_text = (title or "").strip() title_text = desescapar_unicode(title_text) title_text = fix_encoding(title_text) title_text = clean_whitespace(title_text) return title_text except Exception as e: logging.warning(f"Error processing title: {e}") return title or "" def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH): if len(text) <= max_length: return text truncated = text[:max_length - 3] last_space = truncated.rfind(" ") if last_space > 0: return truncated[:last_space] + "..." return truncated + "..." def build_post_text(title_text, link): """ For RSS posts we usually want 'title + newline + link'. If it doesn't fit, prefer truncating the title while keeping the link visible. """ title_text = clean_whitespace(title_text) link = canonicalize_url(link) or link if not title_text: return truncate_text_safely(link) combined = f"{title_text}\n{link}" if len(combined) <= BSKY_TEXT_MAX_LENGTH: return combined reserve = len(link) + 1 available = BSKY_TEXT_MAX_LENGTH - reserve if available <= 10: return truncate_text_safely(combined) trimmed_title = title_text if len(trimmed_title) > available: trimmed_title = trimmed_title[:available - 3] last_space = trimmed_title.rfind(" ") if last_space > 0: trimmed_title = trimmed_title[:last_space] + "..." else: trimmed_title = trimmed_title + "..." return f"{trimmed_title}\n{link}" # --- URL / duplicate helpers --- def is_x_or_twitter_domain(url): try: hostname = (urlparse(url).hostname or "").lower() return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com", "t.co"} except Exception: return False def extract_urls_from_text(text): if not text: return [] return re.findall(r"https?://[^\s]+", text) def extract_non_x_urls_from_text(text): urls = extract_urls_from_text(text) result = [] for url in urls: cleaned = strip_trailing_url_punctuation(url) if cleaned and not is_x_or_twitter_domain(cleaned): result.append(cleaned) return result def build_entry_fingerprint(normalized_title, canonical_link): raw = f"{normalized_title}||{canonical_link or ''}" return hashlib.sha256(raw.encode("utf-8")).hexdigest() # --- Bluesky state helpers --- def default_state(): return { "version": 1, "posted_entries": {}, "posted_by_bsky_uri": {}, "updated_at": None, } def load_state(state_path=STATE_PATH): if not os.path.exists(state_path): logging.info(f"No state file found at {state_path}. Starting with empty state.") return default_state() try: with open(state_path, "r", encoding="utf-8") as f: state = json.load(f) if not isinstance(state, dict): logging.warning("State file invalid. Reinitializing.") return default_state() state.setdefault("version", 1) state.setdefault("posted_entries", {}) state.setdefault("posted_by_bsky_uri", {}) state.setdefault("updated_at", None) return state except Exception as e: logging.warning(f"Could not load state file {state_path}: {e}. Reinitializing.") return default_state() def save_state(state, state_path=STATE_PATH): try: state["updated_at"] = arrow.utcnow().isoformat() temp_path = f"{state_path}.tmp" with open(temp_path, "w", encoding="utf-8") as f: json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True) os.replace(temp_path, state_path) logging.info(f"State saved to {state_path}") except Exception as e: logging.error(f"Failed to save state file {state_path}: {e}") def prune_state(state, max_entries=5000): posted_entries = state.get("posted_entries", {}) if len(posted_entries) <= max_entries: return state sortable = [] for key, record in posted_entries.items(): posted_at = record.get("posted_at") or "" sortable.append((key, posted_at)) sortable.sort(key=lambda x: x[1], reverse=True) keep_keys = {key for key, _ in sortable[:max_entries]} new_posted_entries = {} for key, record in posted_entries.items(): if key in keep_keys: new_posted_entries[key] = record new_posted_by_bsky_uri = {} for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items(): if key in keep_keys: new_posted_by_bsky_uri[bsky_uri] = key state["posted_entries"] = new_posted_entries state["posted_by_bsky_uri"] = new_posted_by_bsky_uri return state def remember_posted_entry(state, candidate, bsky_uri=None): canonical_link = candidate.get("canonical_link") fallback_key = f"fp:{candidate['entry_fingerprint']}" state_key = canonical_link or fallback_key record = { "canonical_link": canonical_link, "title_text": candidate["title_text"], "normalized_title": candidate["normalized_title"], "entry_fingerprint": candidate["entry_fingerprint"], "post_text": candidate["post_text"], "published_at": candidate.get("published_at"), "bsky_uri": bsky_uri, "posted_at": arrow.utcnow().isoformat(), } state["posted_entries"][state_key] = record if bsky_uri: state["posted_by_bsky_uri"][bsky_uri] = state_key def candidate_matches_state(candidate, state): canonical_link = candidate["canonical_link"] entry_fingerprint = candidate["entry_fingerprint"] normalized_title = candidate["normalized_title"] posted_entries = state.get("posted_entries", {}) if canonical_link and canonical_link in posted_entries: return True, "state:canonical_link" for _, record in posted_entries.items(): if record.get("entry_fingerprint") == entry_fingerprint: return True, "state:entry_fingerprint" for _, record in posted_entries.items(): if record.get("normalized_title") == normalized_title: if not canonical_link or record.get("canonical_link") == canonical_link: return True, "state:normalized_title" return False, None # --- Bluesky recent-post dedupe --- def extract_urls_from_facets(record): urls = [] try: facets = getattr(record, "facets", None) or [] for facet in facets: features = getattr(facet, "features", None) or [] for feature in features: uri = getattr(feature, "uri", None) if uri: urls.append(uri) except Exception as e: logging.debug(f"Could not extract facet URLs: {e}") return urls def get_recent_bsky_posts(client, handle, limit=DEDUPE_BSKY_LIMIT): recent_posts = [] try: timeline = client.get_author_feed(handle, limit=limit) for item in timeline.feed: try: if item.reason is not None: continue record = item.post.record if getattr(record, "reply", None) is not None: continue text = getattr(record, "text", "") or "" normalized_text = normalize_text(text) urls = [] urls.extend(extract_non_x_urls_from_text(text)) urls.extend(extract_urls_from_facets(record)) canonical_non_x_urls = set() for url in urls: if not is_x_or_twitter_domain(url): canonical = canonicalize_url(url) if canonical: canonical_non_x_urls.add(canonical) recent_posts.append({ "uri": getattr(item.post, "uri", None), "text": text, "normalized_text": normalized_text, "canonical_non_x_urls": canonical_non_x_urls, "created_at": getattr(record, "created_at", None), }) except Exception as e: logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}") except Exception as e: logging.warning(f"Could not fetch recent Bluesky posts for duplicate detection: {e}") return recent_posts def candidate_matches_existing_bsky(candidate, recent_bsky_posts): candidate_link = candidate["canonical_link"] candidate_post_text_normalized = normalize_text(candidate["post_text"]) candidate_title_normalized = candidate["normalized_title"] for existing in recent_bsky_posts: existing_urls = existing["canonical_non_x_urls"] existing_text_normalized = existing["normalized_text"] if candidate_link and candidate_link in existing_urls: return True, "bsky:canonical_link" if candidate_post_text_normalized == existing_text_normalized: return True, "bsky:normalized_post_text" if candidate_title_normalized and candidate_title_normalized in existing_text_normalized: if candidate_link and candidate_link in existing_urls: return True, "bsky:title_plus_link" return False, None # --- Rich text builder --- def make_rich(content): text_builder = client_utils.TextBuilder() content = clean_whitespace(content) lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue cleaned_word = strip_trailing_url_punctuation(word) if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"): text_builder.link(cleaned_word, cleaned_word) trailing = word[len(cleaned_word):] if trailing: text_builder.text(trailing) elif cleaned_word.startswith("#") and len(cleaned_word) > 1: tag_name = cleaned_word[1:].rstrip(".,;:!?)'\"…") if tag_name: text_builder.tag(cleaned_word, tag_name) trailing = word[len(cleaned_word):] if trailing: text_builder.text(trailing) else: text_builder.text(word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # --- Blob / image upload helpers --- def get_rate_limit_wait_seconds(error_obj, default_delay): try: headers = getattr(error_obj, "headers", None) if headers: reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: now_ts = int(time.time()) reset_ts = int(reset_value) wait_seconds = max(reset_ts - now_ts + 1, default_delay) return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY) except Exception: pass return default_delay def is_transient_blob_error(error_obj): error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def upload_blob_with_retry(client, binary_data, media_label="media"): last_exception = None transient_attempts = 0 for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1): try: result = client.upload_blob(binary_data) return result.blob except Exception as e: last_exception = e error_text = str(e) is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text if is_rate_limited: backoff_delay = min( BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)), BSKY_BLOB_UPLOAD_MAX_DELAY ) wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay) if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES: logging.warning( f"Blob upload rate-limited for {media_label}. " f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s." ) time.sleep(wait_seconds) continue else: logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}") break if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES: transient_attempts += 1 wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts logging.warning( f"Transient blob upload failure for {media_label}: {repr(e)}. " f"Retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s." ) time.sleep(wait_seconds) continue logging.warning(f"Could not upload {media_label}: {repr(e)}") return None logging.warning(f"Could not upload {media_label}: {repr(last_exception)}") return None def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES): try: with Image.open(io.BytesIO(image_bytes)) as img: img = img.convert("RGB") width, height = img.size max_dim = max(width, height) if max_dim > EXTERNAL_THUMB_MAX_DIMENSION: scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim new_size = (max(1, int(width * scale)), max(1, int(height * scale))) img = img.resize(new_size, Image.LANCZOS) logging.info(f"Resized external thumb to {new_size[0]}x{new_size[1]}") for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: out = io.BytesIO() img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) data = out.getvalue() if len(data) <= max_bytes: return data for target_dim in [1000, 900, 800, 700, 600]: resized = img.copy() width, height = resized.size max_dim = max(width, height) if max_dim > target_dim: scale = target_dim / max_dim new_size = (max(1, int(width * scale)), max(1, int(height * scale))) resized = resized.resize(new_size, Image.LANCZOS) for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]: out = io.BytesIO() resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True) data = out.getvalue() if len(data) <= max_bytes: return data except Exception as e: logging.warning(f"Could not compress external thumbnail: {repr(e)}") return None def get_external_thumb_blob_from_url(image_url, client, http_client): try: r = http_client.get(image_url, timeout=HTTP_TIMEOUT, follow_redirects=True) if r.status_code != 200: logging.warning(f"Could not fetch external thumb {image_url}: HTTP {r.status_code}") return None content = r.content if not content: logging.warning(f"Could not fetch external thumb {image_url}: empty body") return None upload_bytes = content if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES: compressed = compress_external_thumb_to_limit(upload_bytes, EXTERNAL_THUMB_MAX_BYTES) if compressed: upload_bytes = compressed else: logging.warning("Could not compress external thumb to fit limit. Omitting thumbnail.") return None return upload_blob_with_retry(client, upload_bytes, media_label=f"external-thumb:{image_url}") except Exception as e: logging.warning(f"Could not fetch/upload external thumb {image_url}: {repr(e)}") return None # --- Link metadata --- def fetch_link_metadata(url, http_client): try: r = http_client.get(url, timeout=HTTP_TIMEOUT, follow_redirects=True) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") title = (soup.find("meta", property="og:title") or soup.find("title")) desc = ( soup.find("meta", property="og:description") or soup.find("meta", attrs={"name": "description"}) ) image = ( soup.find("meta", property="og:image") or soup.find("meta", attrs={"name": "twitter:image"}) ) return { "title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""), "description": desc["content"] if desc and desc.has_attr("content") else "", "image": image["content"] if image and image.has_attr("content") else None, } except Exception as e: logging.warning(f"Could not fetch link metadata for {url}: {e}") return {} def build_external_link_embed(url, fallback_title, client, http_client): link_metadata = fetch_link_metadata(url, http_client) thumb_blob = None if link_metadata.get("image"): thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client) if thumb_blob: logging.info("External link card thumbnail prepared successfully") else: logging.info("External link card will be posted without thumbnail") if link_metadata.get("title") or link_metadata.get("description") or thumb_blob: return models.AppBskyEmbedExternal.Main( external=models.AppBskyEmbedExternal.External( uri=url, title=link_metadata.get("title") or fallback_title or "Enllaç", description=link_metadata.get("description") or "", thumb=thumb_blob, ) ) return None # --- Feed parsing helpers --- def parse_entry_time(item): candidates = [ getattr(item, "published", None), getattr(item, "updated", None), getattr(item, "pubDate", None), ] for candidate in candidates: if candidate: try: return arrow.get(candidate) except Exception: continue return None def build_candidates_from_feed(feed): candidates = [] for item in getattr(feed, "entries", []): try: title_text = process_title(getattr(item, "title", "") or "") link = canonicalize_url(getattr(item, "link", "") or "") published_at = parse_entry_time(item) if not title_text and not link: logging.info("Skipping feed item with no usable title and no link.") continue post_text = build_post_text(title_text, link or "") normalized_title = normalize_text(title_text) entry_fingerprint = build_entry_fingerprint(normalized_title, link) candidates.append({ "item": item, "title_text": title_text, "normalized_title": normalized_title, "canonical_link": link, "published_at": published_at.isoformat() if published_at else None, "published_arrow": published_at, "post_text": post_text, "entry_fingerprint": entry_fingerprint, }) except Exception as e: logging.warning(f"Failed to prepare feed entry candidate: {e}") # Sort oldest to newest so posts appear in order candidates.sort(key=lambda c: c["published_arrow"] or arrow.get(0)) return candidates # --- Main --- def main(): parser = argparse.ArgumentParser(description="Post RSS to Bluesky with JSON state tracking.") parser.add_argument("rss_feed", help="RSS feed URL") parser.add_argument("bsky_handle", help="Bluesky handle") parser.add_argument("bsky_username", help="Bluesky username") parser.add_argument("bsky_app_password", help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL") parser.add_argument("--lang", default="ca", help="Language code for the post") parser.add_argument("--state-path", default=STATE_PATH, help="Path to local JSON state file") args = parser.parse_args() feed_url = args.rss_feed bsky_handle = args.bsky_handle bsky_username = args.bsky_username bsky_password = args.bsky_app_password service_url = args.service post_lang = args.lang state_path = args.state_path # --- Login --- client = Client(base_url=service_url) backoff = 60 while True: try: logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}") client.login(bsky_username, bsky_password) logging.info(f"Login successful for user: {bsky_username}") break except Exception: logging.exception("Login exception") time.sleep(backoff) backoff = min(backoff + 60, 600) state = load_state(state_path) recent_bsky_posts = get_recent_bsky_posts(client, bsky_handle, limit=DEDUPE_BSKY_LIMIT) logging.info(f"Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.") logging.info(f"Local state currently tracks {len(state.get('posted_entries', {}))} posted items.") # --- Parse feed --- response = httpx.get(feed_url, timeout=HTTP_TIMEOUT, follow_redirects=True) response.raise_for_status() try: result = charset_normalizer.from_bytes(response.content).best() if not result or not hasattr(result, "text"): raise ValueError("Could not detect feed encoding.") feed_content = result.text except ValueError: logging.warning("Could not detect feed encoding with charset_normalizer. Trying latin-1.") try: feed_content = response.content.decode("latin-1") except UnicodeDecodeError: logging.warning("Could not decode with latin-1. Trying utf-8 with ignored errors.") feed_content = response.content.decode("utf-8", errors="ignore") feed = fastfeedparser.parse(feed_content) candidates = build_candidates_from_feed(feed) logging.info(f"Prepared {len(candidates)} feed entry candidates for duplicate comparison.") entries_to_post = [] for candidate in candidates: is_dup_state, reason_state = candidate_matches_state(candidate, state) if is_dup_state: logging.info(f"Skipping candidate due to local state duplicate match on: {reason_state}") continue is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts) if is_dup_bsky: logging.info(f"Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}") continue entries_to_post.append(candidate) logging.info(f"{len(entries_to_post)} entries remain after duplicate filtering.") if not entries_to_post: logging.info("ℹ️ Execution finished: no new entries to publish.") return noves_entrades = 0 with httpx.Client() as http_client: for candidate in entries_to_post: title_text = candidate["title_text"] canonical_link = candidate["canonical_link"] post_text = candidate["post_text"] logging.info(f"Preparing to post RSS entry: {canonical_link or title_text}") rich_text = make_rich(post_text) embed = None if canonical_link: embed = build_external_link_embed( canonical_link, fallback_title=title_text or "Enllaç", client=client, http_client=http_client ) try: post_result = client.send_post( text=rich_text, embed=embed, langs=[post_lang] ) bsky_uri = getattr(post_result, "uri", None) remember_posted_entry(state, candidate, bsky_uri=bsky_uri) state = prune_state(state, max_entries=5000) save_state(state, state_path) recent_bsky_posts.insert(0, { "uri": bsky_uri, "text": post_text, "normalized_text": normalize_text(post_text), "canonical_non_x_urls": {canonical_link} if canonical_link else set(), "created_at": arrow.utcnow().isoformat(), }) recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT] noves_entrades += 1 logging.info(f"Posted RSS entry to Bluesky: {canonical_link or title_text}") time.sleep(2) except Exception: logging.exception(f"Failed to post RSS entry {canonical_link or title_text}") if noves_entrades > 0: logging.info(f"🎉 Execution finished: published {noves_entrades} new entries to Bluesky.") else: logging.info("ℹ️ Execution finished: no new entries were published.") if __name__ == "__main__": main()