Files
post2bsky/rss2bsky.py

899 lines
29 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import arrow
import fastfeedparser
import logging
import re
import httpx
import time
import charset_normalizer
import sys
import os
import io
import json
import hashlib
import html
from urllib.parse import urlparse
from atproto import Client, client_utils, models
from bs4 import BeautifulSoup
try:
from PIL import Image
PIL_AVAILABLE = True
except ImportError:
Image = None
PIL_AVAILABLE = False
# --- Configuration ---
STATE_PATH = "rss2bsky_state.json"
DEDUPE_BSKY_LIMIT = 30
BSKY_TEXT_MAX_LENGTH = 275
EXTERNAL_THUMB_MAX_BYTES = 950 * 1024
EXTERNAL_THUMB_MAX_DIMENSION = 1200
EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
HTTP_TIMEOUT = 20
POST_RETRY_DELAY_SECONDS = 2
# --- Logging ---
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.INFO,
stream=sys.stdout
)
if not PIL_AVAILABLE:
logging.warning("Pillow is not installed. External card thumbnail compression is disabled.")
# --- Encoding / text helpers ---
def fix_encoding(text):
try:
return text.encode("latin-1").decode("utf-8")
except (UnicodeEncodeError, UnicodeDecodeError):
return text
def desescapar_unicode(text):
try:
return html.unescape(text)
except Exception as e:
logging.warning(f"Error unescaping unicode/html entities: {e}")
return text
def is_html(text):
return bool(re.search(r'<.*?>', text or ""))
def strip_trailing_url_punctuation(url):
if not url:
return url
return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url.strip())
def canonicalize_url(url):
if not url:
return None
return strip_trailing_url_punctuation(url.strip())
def clean_whitespace(text):
if not text:
return ""
text = text.replace("\r", "\n")
lines = [line.strip() for line in text.splitlines()]
text = "\n".join(lines)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def normalize_text(text):
text = clean_whitespace(text)
text = re.sub(r"\s+", " ", text).strip().lower()
return text
def process_title(title):
try:
if is_html(title):
title_text = BeautifulSoup(title, "html.parser", from_encoding="utf-8").get_text().strip()
else:
title_text = (title or "").strip()
title_text = desescapar_unicode(title_text)
title_text = fix_encoding(title_text)
title_text = clean_whitespace(title_text)
return title_text
except Exception as e:
logging.warning(f"Error processing title: {e}")
return title or ""
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
if len(text) <= max_length:
return text
truncated = text[:max_length - 3]
last_space = truncated.rfind(" ")
if last_space > 0:
return truncated[:last_space] + "..."
return truncated + "..."
def build_post_text_variants(title_text, link):
"""
Build text variants from best to worst.
Preferred:
1. Full title + blank line + real URL
Fallbacks:
2. Truncated title + blank line + real URL
3. Full title only
4. Truncated title only
"""
title_text = clean_whitespace(title_text)
link = canonicalize_url(link) or link or ""
variants = []
seen = set()
def add_variant(text):
cleaned = clean_whitespace(text)
if cleaned and cleaned not in seen:
seen.add(cleaned)
variants.append(cleaned)
if title_text and link:
add_variant(f"{title_text}\n\n{link}")
reserve = len(link) + 2
available = BSKY_TEXT_MAX_LENGTH - reserve
if available > 10:
trimmed_title = title_text
if len(trimmed_title) > available:
trimmed_title = trimmed_title[:available - 3]
last_space = trimmed_title.rfind(" ")
if last_space > 0:
trimmed_title = trimmed_title[:last_space] + "..."
else:
trimmed_title = trimmed_title + "..."
add_variant(f"{trimmed_title}\n\n{link}")
if title_text:
add_variant(title_text)
add_variant(truncate_text_safely(title_text))
if link:
add_variant(link)
return variants
# --- URL / duplicate helpers ---
def is_x_or_twitter_domain(url):
try:
hostname = (urlparse(url).hostname or "").lower()
return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com", "t.co"}
except Exception:
return False
def extract_urls_from_text(text):
if not text:
return []
return re.findall(r"https?://[^\s]+", text)
def extract_non_x_urls_from_text(text):
urls = extract_urls_from_text(text)
result = []
for url in urls:
cleaned = strip_trailing_url_punctuation(url)
if cleaned and not is_x_or_twitter_domain(cleaned):
result.append(cleaned)
return result
def build_entry_fingerprint(normalized_title, canonical_link):
raw = f"{normalized_title}||{canonical_link or ''}"
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
# --- Bluesky state helpers ---
def default_state():
return {
"version": 1,
"posted_entries": {},
"posted_by_bsky_uri": {},
"updated_at": None,
}
def load_state(state_path=STATE_PATH):
if not os.path.exists(state_path):
logging.info(f"No state file found at {state_path}. Starting with empty state.")
return default_state()
try:
with open(state_path, "r", encoding="utf-8") as f:
state = json.load(f)
if not isinstance(state, dict):
logging.warning("State file invalid. Reinitializing.")
return default_state()
state.setdefault("version", 1)
state.setdefault("posted_entries", {})
state.setdefault("posted_by_bsky_uri", {})
state.setdefault("updated_at", None)
return state
except Exception as e:
logging.warning(f"Could not load state file {state_path}: {e}. Reinitializing.")
return default_state()
def save_state(state, state_path=STATE_PATH):
try:
state["updated_at"] = arrow.utcnow().isoformat()
temp_path = f"{state_path}.tmp"
with open(temp_path, "w", encoding="utf-8") as f:
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
os.replace(temp_path, state_path)
logging.info(f"State saved to {state_path}")
except Exception as e:
logging.error(f"Failed to save state file {state_path}: {e}")
def prune_state(state, max_entries=5000):
posted_entries = state.get("posted_entries", {})
if len(posted_entries) <= max_entries:
return state
sortable = []
for key, record in posted_entries.items():
posted_at = record.get("posted_at") or ""
sortable.append((key, posted_at))
sortable.sort(key=lambda x: x[1], reverse=True)
keep_keys = {key for key, _ in sortable[:max_entries]}
new_posted_entries = {}
for key, record in posted_entries.items():
if key in keep_keys:
new_posted_entries[key] = record
new_posted_by_bsky_uri = {}
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
if key in keep_keys:
new_posted_by_bsky_uri[bsky_uri] = key
state["posted_entries"] = new_posted_entries
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
return state
def remember_posted_entry(state, candidate, posted_text, bsky_uri=None):
canonical_link = candidate.get("canonical_link")
fallback_key = f"fp:{candidate['entry_fingerprint']}"
state_key = canonical_link or fallback_key
record = {
"canonical_link": canonical_link,
"title_text": candidate["title_text"],
"normalized_title": candidate["normalized_title"],
"entry_fingerprint": candidate["entry_fingerprint"],
"post_text": posted_text,
"published_at": candidate.get("published_at"),
"bsky_uri": bsky_uri,
"posted_at": arrow.utcnow().isoformat(),
}
state["posted_entries"][state_key] = record
if bsky_uri:
state["posted_by_bsky_uri"][bsky_uri] = state_key
def candidate_matches_state(candidate, state):
canonical_link = candidate["canonical_link"]
entry_fingerprint = candidate["entry_fingerprint"]
normalized_title = candidate["normalized_title"]
posted_entries = state.get("posted_entries", {})
if canonical_link and canonical_link in posted_entries:
return True, "state:canonical_link"
for _, record in posted_entries.items():
if record.get("entry_fingerprint") == entry_fingerprint:
return True, "state:entry_fingerprint"
for _, record in posted_entries.items():
if record.get("normalized_title") == normalized_title:
if not canonical_link or record.get("canonical_link") == canonical_link:
return True, "state:normalized_title"
return False, None
# --- Bluesky recent-post dedupe ---
def extract_urls_from_facets(record):
urls = []
try:
facets = getattr(record, "facets", None) or []
for facet in facets:
features = getattr(facet, "features", None) or []
for feature in features:
uri = getattr(feature, "uri", None)
if uri:
urls.append(uri)
except Exception as e:
logging.debug(f"Could not extract facet URLs: {e}")
return urls
def get_recent_bsky_posts(client, handle, limit=DEDUPE_BSKY_LIMIT):
recent_posts = []
try:
timeline = client.get_author_feed(handle, limit=limit)
for item in timeline.feed:
try:
if item.reason is not None:
continue
record = item.post.record
if getattr(record, "reply", None) is not None:
continue
text = getattr(record, "text", "") or ""
normalized_text = normalize_text(text)
urls = []
urls.extend(extract_non_x_urls_from_text(text))
urls.extend(extract_urls_from_facets(record))
canonical_non_x_urls = set()
for url in urls:
if not is_x_or_twitter_domain(url):
canonical = canonicalize_url(url)
if canonical:
canonical_non_x_urls.add(canonical)
recent_posts.append({
"uri": getattr(item.post, "uri", None),
"text": text,
"normalized_text": normalized_text,
"canonical_non_x_urls": canonical_non_x_urls,
"created_at": getattr(record, "created_at", None),
})
except Exception as e:
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
except Exception as e:
logging.warning(f"Could not fetch recent Bluesky posts for duplicate detection: {e}")
return recent_posts
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
candidate_link = candidate["canonical_link"]
candidate_title_normalized = candidate["normalized_title"]
for existing in recent_bsky_posts:
existing_urls = existing["canonical_non_x_urls"]
existing_text_normalized = existing["normalized_text"]
if candidate_link and candidate_link in existing_urls:
return True, "bsky:canonical_link"
if candidate_title_normalized and candidate_title_normalized in existing_text_normalized:
if not candidate_link or candidate_link in existing_urls:
return True, "bsky:title_plus_link"
return False, None
# --- Rich text builder ---
def make_rich(content):
text_builder = client_utils.TextBuilder()
content = clean_whitespace(content)
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
cleaned_word = strip_trailing_url_punctuation(word)
if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"):
text_builder.link(cleaned_word, cleaned_word)
trailing = word[len(cleaned_word):]
if trailing:
text_builder.text(trailing)
elif cleaned_word.startswith("#") and len(cleaned_word) > 1:
tag_name = cleaned_word[1:].rstrip(".,;:!?)'\"")
if tag_name:
text_builder.tag(cleaned_word, tag_name)
trailing = word[len(cleaned_word):]
if trailing:
text_builder.text(trailing)
else:
text_builder.text(word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
# --- Blob / image upload helpers ---
def get_rate_limit_wait_seconds(error_obj, default_delay):
try:
headers = getattr(error_obj, "headers", None)
if headers:
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
if reset_value:
now_ts = int(time.time())
reset_ts = int(reset_value)
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY)
except Exception:
pass
return default_delay
def is_transient_blob_error(error_obj):
error_text = repr(error_obj)
transient_signals = [
"InvokeTimeoutError",
"ReadTimeout",
"WriteTimeout",
"TimeoutException",
"RemoteProtocolError",
"ConnectError",
"503",
"502",
"504",
]
return any(signal in error_text for signal in transient_signals)
def upload_blob_with_retry(client, binary_data, media_label="media"):
last_exception = None
transient_attempts = 0
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
try:
result = client.upload_blob(binary_data)
return result.blob
except Exception as e:
last_exception = e
error_text = str(e)
is_rate_limited = "429" in error_text or "RateLimitExceeded" in error_text
if is_rate_limited:
backoff_delay = min(
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
BSKY_BLOB_UPLOAD_MAX_DELAY
)
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
logging.warning(
f"Blob upload rate-limited for {media_label}. "
f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
else:
logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}")
break
if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
transient_attempts += 1
wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
logging.warning(
f"Transient blob upload failure for {media_label}: {repr(e)}. "
f"Retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s."
)
time.sleep(wait_seconds)
continue
logging.warning(f"Could not upload {media_label}: {repr(e)}")
return None
logging.warning(f"Could not upload {media_label}: {repr(last_exception)}")
return None
def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES):
if not PIL_AVAILABLE:
return None
try:
with Image.open(io.BytesIO(image_bytes)) as img:
img = img.convert("RGB")
width, height = img.size
max_dim = max(width, height)
if max_dim > EXTERNAL_THUMB_MAX_DIMENSION:
scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim
new_size = (max(1, int(width * scale)), max(1, int(height * scale)))
img = img.resize(new_size, Image.LANCZOS)
for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
out = io.BytesIO()
img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True)
data = out.getvalue()
if len(data) <= max_bytes:
return data
for target_dim in [1000, 900, 800, 700, 600]:
resized = img.copy()
width, height = resized.size
max_dim = max(width, height)
if max_dim > target_dim:
scale = target_dim / max_dim
new_size = (max(1, int(width * scale)), max(1, int(height * scale)))
resized = resized.resize(new_size, Image.LANCZOS)
for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
out = io.BytesIO()
resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True)
data = out.getvalue()
if len(data) <= max_bytes:
return data
except Exception as e:
logging.warning(f"Could not compress external thumbnail: {repr(e)}")
return None
def get_external_thumb_blob_from_url(image_url, client, http_client):
try:
r = http_client.get(image_url, timeout=HTTP_TIMEOUT, follow_redirects=True)
if r.status_code != 200:
logging.warning(f"Could not fetch external thumb {image_url}: HTTP {r.status_code}")
return None
content = r.content
if not content:
logging.warning(f"Could not fetch external thumb {image_url}: empty body")
return None
upload_bytes = content
if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES:
compressed = compress_external_thumb_to_limit(upload_bytes, EXTERNAL_THUMB_MAX_BYTES)
if compressed:
upload_bytes = compressed
else:
logging.warning("Could not compress external thumb to fit limit. Omitting thumbnail.")
return None
return upload_blob_with_retry(client, upload_bytes, media_label=f"external-thumb:{image_url}")
except Exception as e:
logging.warning(f"Could not fetch/upload external thumb {image_url}: {repr(e)}")
return None
# --- Link metadata ---
def fetch_link_metadata(url, http_client):
try:
r = http_client.get(url, timeout=HTTP_TIMEOUT, follow_redirects=True)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
title = (soup.find("meta", property="og:title") or soup.find("title"))
desc = (
soup.find("meta", property="og:description")
or soup.find("meta", attrs={"name": "description"})
)
image = (
soup.find("meta", property="og:image")
or soup.find("meta", attrs={"name": "twitter:image"})
)
return {
"title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""),
"description": desc["content"] if desc and desc.has_attr("content") else "",
"image": image["content"] if image and image.has_attr("content") else None,
}
except Exception as e:
logging.warning(f"Could not fetch link metadata for {url}: {e}")
return {}
def build_external_link_embed(url, fallback_title, client, http_client):
link_metadata = fetch_link_metadata(url, http_client)
thumb_blob = None
if link_metadata.get("image"):
thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client)
if thumb_blob:
logging.info("External link card thumbnail prepared successfully")
else:
logging.info("External link card will be posted without thumbnail")
if link_metadata.get("title") or link_metadata.get("description") or thumb_blob:
return models.AppBskyEmbedExternal.Main(
external=models.AppBskyEmbedExternal.External(
uri=url,
title=link_metadata.get("title") or fallback_title or "Enllaç",
description=link_metadata.get("description") or "",
thumb=thumb_blob,
)
)
return None
# --- Feed parsing helpers ---
def parse_entry_time(item):
candidates = [
getattr(item, "published", None),
getattr(item, "updated", None),
getattr(item, "pubDate", None),
]
for candidate in candidates:
if candidate:
try:
return arrow.get(candidate)
except Exception:
continue
return None
def build_candidates_from_feed(feed):
candidates = []
for item in getattr(feed, "entries", []):
try:
title_text = process_title(getattr(item, "title", "") or "")
link = canonicalize_url(getattr(item, "link", "") or "")
published_at = parse_entry_time(item)
if not title_text and not link:
logging.info("Skipping feed item with no usable title and no link.")
continue
normalized_title = normalize_text(title_text)
entry_fingerprint = build_entry_fingerprint(normalized_title, link)
candidates.append({
"item": item,
"title_text": title_text,
"normalized_title": normalized_title,
"canonical_link": link,
"published_at": published_at.isoformat() if published_at else None,
"published_arrow": published_at,
"entry_fingerprint": entry_fingerprint,
"post_text_variants": build_post_text_variants(title_text, link),
})
except Exception as e:
logging.warning(f"Failed to prepare feed entry candidate: {e}")
candidates.sort(key=lambda c: c["published_arrow"] or arrow.get(0))
return candidates
# --- Posting helpers ---
def is_probable_length_error(exc):
text = repr(exc)
signals = [
"TextTooLong",
"text too long",
"Invalid app.bsky.feed.post record",
"string too long",
"maxLength",
"length",
]
return any(signal.lower() in text.lower() for signal in signals)
def try_send_post_with_variants(client, text_variants, embed, post_lang):
last_exception = None
for idx, variant in enumerate(text_variants, start=1):
try:
logging.info(f"Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})")
rich_text = make_rich(variant)
result = client.send_post(text=rich_text, embed=embed, langs=[post_lang])
return result, variant
except Exception as e:
last_exception = e
logging.warning(f"Post variant {idx} failed: {repr(e)}")
if not is_probable_length_error(e):
raise
if last_exception:
raise last_exception
raise RuntimeError("No text variants available to post.")
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Post RSS to Bluesky with JSON state tracking.")
parser.add_argument("rss_feed", help="RSS feed URL")
parser.add_argument("bsky_handle", help="Bluesky handle")
parser.add_argument("bsky_username", help="Bluesky username")
parser.add_argument("bsky_app_password", help="Bluesky app password")
parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL")
parser.add_argument("--lang", default="ca", help="Language code for the post")
parser.add_argument("--state-path", default=STATE_PATH, help="Path to local JSON state file")
args = parser.parse_args()
feed_url = args.rss_feed
bsky_handle = args.bsky_handle
bsky_username = args.bsky_username
bsky_password = args.bsky_app_password
service_url = args.service
post_lang = args.lang
state_path = args.state_path
# --- Login ---
client = Client(base_url=service_url)
backoff = 60
while True:
try:
logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}")
client.login(bsky_username, bsky_password)
logging.info(f"Login successful for user: {bsky_username}")
break
except Exception:
logging.exception("Login exception")
time.sleep(backoff)
backoff = min(backoff + 60, 600)
state = load_state(state_path)
recent_bsky_posts = get_recent_bsky_posts(client, bsky_handle, limit=DEDUPE_BSKY_LIMIT)
logging.info(f"Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.")
logging.info(f"Local state currently tracks {len(state.get('posted_entries', {}))} posted items.")
# --- Parse feed ---
response = httpx.get(feed_url, timeout=HTTP_TIMEOUT, follow_redirects=True)
response.raise_for_status()
try:
result = charset_normalizer.from_bytes(response.content).best()
if not result or not hasattr(result, "text"):
raise ValueError("Could not detect feed encoding.")
feed_content = result.text
except ValueError:
logging.warning("Could not detect feed encoding with charset_normalizer. Trying latin-1.")
try:
feed_content = response.content.decode("latin-1")
except UnicodeDecodeError:
logging.warning("Could not decode with latin-1. Trying utf-8 with ignored errors.")
feed_content = response.content.decode("utf-8", errors="ignore")
feed = fastfeedparser.parse(feed_content)
candidates = build_candidates_from_feed(feed)
logging.info(f"Prepared {len(candidates)} feed entry candidates for duplicate comparison.")
entries_to_post = []
for candidate in candidates:
is_dup_state, reason_state = candidate_matches_state(candidate, state)
if is_dup_state:
logging.info(f"Skipping candidate due to local state duplicate match on: {reason_state}")
continue
is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts)
if is_dup_bsky:
logging.info(f"Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}")
continue
entries_to_post.append(candidate)
logging.info(f"{len(entries_to_post)} entries remain after duplicate filtering.")
if not entries_to_post:
logging.info(" Execution finished: no new entries to publish.")
return
noves_entrades = 0
with httpx.Client() as http_client:
for candidate in entries_to_post:
title_text = candidate["title_text"]
canonical_link = candidate["canonical_link"]
text_variants = candidate["post_text_variants"]
logging.info(f"Preparing to post RSS entry: {canonical_link or title_text}")
embed = None
if canonical_link:
embed = build_external_link_embed(
canonical_link,
fallback_title=title_text or "Enllaç",
client=client,
http_client=http_client
)
try:
post_result, posted_text = try_send_post_with_variants(
client=client,
text_variants=text_variants,
embed=embed,
post_lang=post_lang
)
bsky_uri = getattr(post_result, "uri", None)
remember_posted_entry(state, candidate, posted_text=posted_text, bsky_uri=bsky_uri)
state = prune_state(state, max_entries=5000)
save_state(state, state_path)
recent_bsky_posts.insert(0, {
"uri": bsky_uri,
"text": posted_text,
"normalized_text": normalize_text(posted_text),
"canonical_non_x_urls": {canonical_link} if canonical_link else set(),
"created_at": arrow.utcnow().isoformat(),
})
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
noves_entrades += 1
logging.info(f"Posted RSS entry to Bluesky: {canonical_link or title_text}")
time.sleep(POST_RETRY_DELAY_SECONDS)
except Exception:
logging.exception(f"Failed to post RSS entry {canonical_link or title_text}")
if noves_entrades > 0:
logging.info(f"🎉 Execution finished: published {noves_entrades} new entries to Bluesky.")
else:
logging.info(" Execution finished: no new entries were published.")
if __name__ == "__main__":
main()