959 lines
31 KiB
Python
959 lines
31 KiB
Python
import argparse
|
||
import arrow
|
||
import fastfeedparser
|
||
import logging
|
||
import re
|
||
import httpx
|
||
import time
|
||
import charset_normalizer
|
||
import sys
|
||
import os
|
||
import io
|
||
import json
|
||
import hashlib
|
||
import html
|
||
from urllib.parse import urlparse
|
||
from atproto import Client, client_utils, models
|
||
from bs4 import BeautifulSoup
|
||
|
||
try:
|
||
from PIL import Image
|
||
PIL_AVAILABLE = True
|
||
except ImportError:
|
||
Image = None
|
||
PIL_AVAILABLE = False
|
||
|
||
# --- Configuration ---
|
||
STATE_PATH = "rss2bsky_state.json"
|
||
DEDUPE_BSKY_LIMIT = 30
|
||
BSKY_TEXT_MAX_LENGTH = 275
|
||
|
||
EXTERNAL_THUMB_MAX_BYTES = 950 * 1024
|
||
EXTERNAL_THUMB_MAX_DIMENSION = 1200
|
||
EXTERNAL_THUMB_MIN_JPEG_QUALITY = 40
|
||
|
||
BSKY_BLOB_UPLOAD_MAX_RETRIES = 5
|
||
BSKY_BLOB_UPLOAD_BASE_DELAY = 10
|
||
BSKY_BLOB_UPLOAD_MAX_DELAY = 300
|
||
BSKY_BLOB_TRANSIENT_ERROR_RETRIES = 3
|
||
BSKY_BLOB_TRANSIENT_ERROR_DELAY = 15
|
||
|
||
HTTP_TIMEOUT = 20
|
||
POST_RETRY_DELAY_SECONDS = 2
|
||
|
||
# Thumbnail upload cooldown state
|
||
THUMB_UPLOAD_COOLDOWN_UNTIL = 0
|
||
|
||
# --- Logging ---
|
||
logging.basicConfig(
|
||
format="%(asctime)s %(message)s",
|
||
level=logging.INFO,
|
||
stream=sys.stdout
|
||
)
|
||
|
||
if not PIL_AVAILABLE:
|
||
logging.warning("Pillow is not installed. External card thumbnail compression is disabled.")
|
||
|
||
|
||
# --- Encoding / text helpers ---
|
||
def fix_encoding(text):
|
||
try:
|
||
return text.encode("latin-1").decode("utf-8")
|
||
except (UnicodeEncodeError, UnicodeDecodeError):
|
||
return text
|
||
|
||
|
||
def desescapar_unicode(text):
|
||
try:
|
||
return html.unescape(text)
|
||
except Exception as e:
|
||
logging.warning(f"Error unescaping unicode/html entities: {e}")
|
||
return text
|
||
|
||
|
||
def is_html(text):
|
||
return bool(re.search(r'<.*?>', text or ""))
|
||
|
||
|
||
def strip_trailing_url_punctuation(url):
|
||
if not url:
|
||
return url
|
||
return re.sub(r"[\s…\.,;:!?)\]\"']+$", "", url.strip())
|
||
|
||
|
||
def canonicalize_url(url):
|
||
if not url:
|
||
return None
|
||
return strip_trailing_url_punctuation(url.strip())
|
||
|
||
|
||
def clean_whitespace(text):
|
||
if not text:
|
||
return ""
|
||
text = text.replace("\r", "\n")
|
||
lines = [line.strip() for line in text.splitlines()]
|
||
text = "\n".join(lines)
|
||
text = re.sub(r"\n{3,}", "\n\n", text)
|
||
return text.strip()
|
||
|
||
|
||
def normalize_text(text):
|
||
text = clean_whitespace(text)
|
||
text = re.sub(r"\s+", " ", text).strip().lower()
|
||
return text
|
||
|
||
|
||
def process_title(title):
|
||
try:
|
||
if is_html(title):
|
||
title_text = BeautifulSoup(title, "html.parser", from_encoding="utf-8").get_text().strip()
|
||
else:
|
||
title_text = (title or "").strip()
|
||
|
||
title_text = desescapar_unicode(title_text)
|
||
title_text = fix_encoding(title_text)
|
||
title_text = clean_whitespace(title_text)
|
||
return title_text
|
||
except Exception as e:
|
||
logging.warning(f"Error processing title: {e}")
|
||
return title or ""
|
||
|
||
|
||
def truncate_text_safely(text, max_length=BSKY_TEXT_MAX_LENGTH):
|
||
if len(text) <= max_length:
|
||
return text
|
||
|
||
truncated = text[:max_length - 3]
|
||
last_space = truncated.rfind(" ")
|
||
if last_space > 0:
|
||
return truncated[:last_space] + "..."
|
||
return truncated + "..."
|
||
|
||
|
||
def build_post_text_variants(title_text, link):
|
||
"""
|
||
Build text variants from best to worst.
|
||
|
||
Preferred:
|
||
1. Full title + blank line + real URL
|
||
Fallbacks:
|
||
2. Truncated title + blank line + real URL
|
||
3. Full title only
|
||
4. Truncated title only
|
||
"""
|
||
title_text = clean_whitespace(title_text)
|
||
link = canonicalize_url(link) or link or ""
|
||
|
||
variants = []
|
||
seen = set()
|
||
|
||
def add_variant(text):
|
||
cleaned = clean_whitespace(text)
|
||
if cleaned and cleaned not in seen:
|
||
seen.add(cleaned)
|
||
variants.append(cleaned)
|
||
|
||
if title_text and link:
|
||
add_variant(f"{title_text}\n\n{link}")
|
||
|
||
reserve = len(link) + 2
|
||
available = BSKY_TEXT_MAX_LENGTH - reserve
|
||
if available > 10:
|
||
trimmed_title = title_text
|
||
if len(trimmed_title) > available:
|
||
trimmed_title = trimmed_title[:available - 3]
|
||
last_space = trimmed_title.rfind(" ")
|
||
if last_space > 0:
|
||
trimmed_title = trimmed_title[:last_space] + "..."
|
||
else:
|
||
trimmed_title = trimmed_title + "..."
|
||
add_variant(f"{trimmed_title}\n\n{link}")
|
||
|
||
if title_text:
|
||
add_variant(title_text)
|
||
add_variant(truncate_text_safely(title_text))
|
||
|
||
if link:
|
||
add_variant(link)
|
||
|
||
return variants
|
||
|
||
|
||
# --- URL / duplicate helpers ---
|
||
def is_x_or_twitter_domain(url):
|
||
try:
|
||
hostname = (urlparse(url).hostname or "").lower()
|
||
return hostname in {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com", "t.co"}
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def extract_urls_from_text(text):
|
||
if not text:
|
||
return []
|
||
return re.findall(r"https?://[^\s]+", text)
|
||
|
||
|
||
def extract_non_x_urls_from_text(text):
|
||
urls = extract_urls_from_text(text)
|
||
result = []
|
||
|
||
for url in urls:
|
||
cleaned = strip_trailing_url_punctuation(url)
|
||
if cleaned and not is_x_or_twitter_domain(cleaned):
|
||
result.append(cleaned)
|
||
|
||
return result
|
||
|
||
|
||
def build_entry_fingerprint(normalized_title, canonical_link):
|
||
raw = f"{normalized_title}||{canonical_link or ''}"
|
||
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
|
||
|
||
|
||
# --- Bluesky state helpers ---
|
||
def default_state():
|
||
return {
|
||
"version": 1,
|
||
"posted_entries": {},
|
||
"posted_by_bsky_uri": {},
|
||
"updated_at": None,
|
||
}
|
||
|
||
|
||
def load_state(state_path=STATE_PATH):
|
||
if not os.path.exists(state_path):
|
||
logging.info(f"No state file found at {state_path}. Starting with empty state.")
|
||
return default_state()
|
||
|
||
try:
|
||
with open(state_path, "r", encoding="utf-8") as f:
|
||
state = json.load(f)
|
||
|
||
if not isinstance(state, dict):
|
||
logging.warning("State file invalid. Reinitializing.")
|
||
return default_state()
|
||
|
||
state.setdefault("version", 1)
|
||
state.setdefault("posted_entries", {})
|
||
state.setdefault("posted_by_bsky_uri", {})
|
||
state.setdefault("updated_at", None)
|
||
return state
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Could not load state file {state_path}: {e}. Reinitializing.")
|
||
return default_state()
|
||
|
||
|
||
def save_state(state, state_path=STATE_PATH):
|
||
try:
|
||
state["updated_at"] = arrow.utcnow().isoformat()
|
||
temp_path = f"{state_path}.tmp"
|
||
|
||
with open(temp_path, "w", encoding="utf-8") as f:
|
||
json.dump(state, f, ensure_ascii=False, indent=2, sort_keys=True)
|
||
|
||
os.replace(temp_path, state_path)
|
||
logging.info(f"State saved to {state_path}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Failed to save state file {state_path}: {e}")
|
||
|
||
|
||
def prune_state(state, max_entries=5000):
|
||
posted_entries = state.get("posted_entries", {})
|
||
|
||
if len(posted_entries) <= max_entries:
|
||
return state
|
||
|
||
sortable = []
|
||
for key, record in posted_entries.items():
|
||
posted_at = record.get("posted_at") or ""
|
||
sortable.append((key, posted_at))
|
||
|
||
sortable.sort(key=lambda x: x[1], reverse=True)
|
||
keep_keys = {key for key, _ in sortable[:max_entries]}
|
||
|
||
new_posted_entries = {}
|
||
for key, record in posted_entries.items():
|
||
if key in keep_keys:
|
||
new_posted_entries[key] = record
|
||
|
||
new_posted_by_bsky_uri = {}
|
||
for bsky_uri, key in state.get("posted_by_bsky_uri", {}).items():
|
||
if key in keep_keys:
|
||
new_posted_by_bsky_uri[bsky_uri] = key
|
||
|
||
state["posted_entries"] = new_posted_entries
|
||
state["posted_by_bsky_uri"] = new_posted_by_bsky_uri
|
||
return state
|
||
|
||
|
||
def remember_posted_entry(state, candidate, posted_text, bsky_uri=None):
|
||
canonical_link = candidate.get("canonical_link")
|
||
fallback_key = f"fp:{candidate['entry_fingerprint']}"
|
||
state_key = canonical_link or fallback_key
|
||
|
||
record = {
|
||
"canonical_link": canonical_link,
|
||
"title_text": candidate["title_text"],
|
||
"normalized_title": candidate["normalized_title"],
|
||
"entry_fingerprint": candidate["entry_fingerprint"],
|
||
"post_text": posted_text,
|
||
"published_at": candidate.get("published_at"),
|
||
"bsky_uri": bsky_uri,
|
||
"posted_at": arrow.utcnow().isoformat(),
|
||
}
|
||
|
||
state["posted_entries"][state_key] = record
|
||
|
||
if bsky_uri:
|
||
state["posted_by_bsky_uri"][bsky_uri] = state_key
|
||
|
||
|
||
def candidate_matches_state(candidate, state):
|
||
canonical_link = candidate["canonical_link"]
|
||
entry_fingerprint = candidate["entry_fingerprint"]
|
||
normalized_title = candidate["normalized_title"]
|
||
|
||
posted_entries = state.get("posted_entries", {})
|
||
|
||
if canonical_link and canonical_link in posted_entries:
|
||
return True, "state:canonical_link"
|
||
|
||
for _, record in posted_entries.items():
|
||
if record.get("entry_fingerprint") == entry_fingerprint:
|
||
return True, "state:entry_fingerprint"
|
||
|
||
for _, record in posted_entries.items():
|
||
if record.get("normalized_title") == normalized_title:
|
||
if not canonical_link or record.get("canonical_link") == canonical_link:
|
||
return True, "state:normalized_title"
|
||
|
||
return False, None
|
||
|
||
|
||
# --- Bluesky recent-post dedupe ---
|
||
def extract_urls_from_facets(record):
|
||
urls = []
|
||
|
||
try:
|
||
facets = getattr(record, "facets", None) or []
|
||
for facet in facets:
|
||
features = getattr(facet, "features", None) or []
|
||
for feature in features:
|
||
uri = getattr(feature, "uri", None)
|
||
if uri:
|
||
urls.append(uri)
|
||
except Exception as e:
|
||
logging.debug(f"Could not extract facet URLs: {e}")
|
||
|
||
return urls
|
||
|
||
|
||
def get_recent_bsky_posts(client, handle, limit=DEDUPE_BSKY_LIMIT):
|
||
recent_posts = []
|
||
|
||
try:
|
||
timeline = client.get_author_feed(handle, limit=limit)
|
||
|
||
for item in timeline.feed:
|
||
try:
|
||
if item.reason is not None:
|
||
continue
|
||
|
||
record = item.post.record
|
||
if getattr(record, "reply", None) is not None:
|
||
continue
|
||
|
||
text = getattr(record, "text", "") or ""
|
||
normalized_text = normalize_text(text)
|
||
|
||
urls = []
|
||
urls.extend(extract_non_x_urls_from_text(text))
|
||
urls.extend(extract_urls_from_facets(record))
|
||
|
||
canonical_non_x_urls = set()
|
||
for url in urls:
|
||
if not is_x_or_twitter_domain(url):
|
||
canonical = canonicalize_url(url)
|
||
if canonical:
|
||
canonical_non_x_urls.add(canonical)
|
||
|
||
recent_posts.append({
|
||
"uri": getattr(item.post, "uri", None),
|
||
"text": text,
|
||
"normalized_text": normalized_text,
|
||
"canonical_non_x_urls": canonical_non_x_urls,
|
||
"created_at": getattr(record, "created_at", None),
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.debug(f"Skipping one Bluesky feed item during dedupe fetch: {e}")
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Could not fetch recent Bluesky posts for duplicate detection: {e}")
|
||
|
||
return recent_posts
|
||
|
||
|
||
def candidate_matches_existing_bsky(candidate, recent_bsky_posts):
|
||
candidate_link = candidate["canonical_link"]
|
||
candidate_title_normalized = candidate["normalized_title"]
|
||
|
||
for existing in recent_bsky_posts:
|
||
existing_urls = existing["canonical_non_x_urls"]
|
||
existing_text_normalized = existing["normalized_text"]
|
||
|
||
if candidate_link and candidate_link in existing_urls:
|
||
return True, "bsky:canonical_link"
|
||
|
||
if candidate_title_normalized and candidate_title_normalized in existing_text_normalized:
|
||
if not candidate_link or candidate_link in existing_urls:
|
||
return True, "bsky:title_plus_link"
|
||
|
||
return False, None
|
||
|
||
|
||
# --- Rich text builder ---
|
||
def make_rich(content):
|
||
text_builder = client_utils.TextBuilder()
|
||
content = clean_whitespace(content)
|
||
lines = content.splitlines()
|
||
|
||
for line_idx, line in enumerate(lines):
|
||
if not line.strip():
|
||
if line_idx < len(lines) - 1:
|
||
text_builder.text("\n")
|
||
continue
|
||
|
||
words = line.split(" ")
|
||
for i, word in enumerate(words):
|
||
if not word:
|
||
if i < len(words) - 1:
|
||
text_builder.text(" ")
|
||
continue
|
||
|
||
cleaned_word = strip_trailing_url_punctuation(word)
|
||
|
||
if cleaned_word.startswith("http://") or cleaned_word.startswith("https://"):
|
||
text_builder.link(cleaned_word, cleaned_word)
|
||
trailing = word[len(cleaned_word):]
|
||
if trailing:
|
||
text_builder.text(trailing)
|
||
|
||
elif cleaned_word.startswith("#") and len(cleaned_word) > 1:
|
||
tag_name = cleaned_word[1:].rstrip(".,;:!?)'\"…")
|
||
if tag_name:
|
||
text_builder.tag(cleaned_word, tag_name)
|
||
trailing = word[len(cleaned_word):]
|
||
if trailing:
|
||
text_builder.text(trailing)
|
||
else:
|
||
text_builder.text(word)
|
||
|
||
else:
|
||
text_builder.text(word)
|
||
|
||
if i < len(words) - 1:
|
||
text_builder.text(" ")
|
||
|
||
if line_idx < len(lines) - 1:
|
||
text_builder.text("\n")
|
||
|
||
return text_builder
|
||
|
||
|
||
# --- Blob / image upload helpers ---
|
||
def get_rate_limit_wait_seconds(error_obj, default_delay):
|
||
try:
|
||
headers = getattr(error_obj, "headers", None)
|
||
if headers:
|
||
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
|
||
if reset_value:
|
||
now_ts = int(time.time())
|
||
reset_ts = int(reset_value)
|
||
wait_seconds = max(reset_ts - now_ts + 1, default_delay)
|
||
return min(wait_seconds, BSKY_BLOB_UPLOAD_MAX_DELAY)
|
||
except Exception:
|
||
pass
|
||
|
||
return default_delay
|
||
|
||
|
||
def get_rate_limit_reset_timestamp(error_obj):
|
||
try:
|
||
headers = getattr(error_obj, "headers", None)
|
||
if headers:
|
||
reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset")
|
||
if reset_value:
|
||
return int(reset_value)
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
|
||
def is_transient_blob_error(error_obj):
|
||
error_text = repr(error_obj)
|
||
transient_signals = [
|
||
"InvokeTimeoutError",
|
||
"ReadTimeout",
|
||
"WriteTimeout",
|
||
"TimeoutException",
|
||
"RemoteProtocolError",
|
||
"ConnectError",
|
||
"503",
|
||
"502",
|
||
"504",
|
||
]
|
||
return any(signal in error_text for signal in transient_signals)
|
||
|
||
|
||
def is_rate_limited_error(error_obj):
|
||
error_text = str(error_obj)
|
||
return "429" in error_text or "RateLimitExceeded" in error_text
|
||
|
||
|
||
def activate_thumb_upload_cooldown_from_error(error_obj):
|
||
global THUMB_UPLOAD_COOLDOWN_UNTIL
|
||
|
||
reset_ts = get_rate_limit_reset_timestamp(error_obj)
|
||
if reset_ts:
|
||
if reset_ts > THUMB_UPLOAD_COOLDOWN_UNTIL:
|
||
THUMB_UPLOAD_COOLDOWN_UNTIL = reset_ts
|
||
logging.warning(
|
||
f"Thumbnail uploads disabled until rate-limit reset at "
|
||
f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_ts))}."
|
||
)
|
||
else:
|
||
fallback_reset = int(time.time()) + 3600
|
||
if fallback_reset > THUMB_UPLOAD_COOLDOWN_UNTIL:
|
||
THUMB_UPLOAD_COOLDOWN_UNTIL = fallback_reset
|
||
logging.warning("Thumbnail uploads disabled temporarily for 1 hour due to rate limiting.")
|
||
|
||
|
||
def is_thumb_upload_cooldown_active():
|
||
return int(time.time()) < THUMB_UPLOAD_COOLDOWN_UNTIL
|
||
|
||
|
||
def upload_blob_with_retry(client, binary_data, media_label="media", optional=False, cooldown_on_rate_limit=False):
|
||
last_exception = None
|
||
transient_attempts = 0
|
||
|
||
for attempt in range(1, BSKY_BLOB_UPLOAD_MAX_RETRIES + 1):
|
||
try:
|
||
result = client.upload_blob(binary_data)
|
||
return result.blob
|
||
|
||
except Exception as e:
|
||
last_exception = e
|
||
is_rate_limited = is_rate_limited_error(e)
|
||
|
||
if is_rate_limited:
|
||
if cooldown_on_rate_limit:
|
||
activate_thumb_upload_cooldown_from_error(e)
|
||
|
||
backoff_delay = min(
|
||
BSKY_BLOB_UPLOAD_BASE_DELAY * (2 ** (attempt - 1)),
|
||
BSKY_BLOB_UPLOAD_MAX_DELAY
|
||
)
|
||
wait_seconds = get_rate_limit_wait_seconds(e, backoff_delay)
|
||
|
||
if optional and cooldown_on_rate_limit:
|
||
logging.warning(
|
||
f"Optional blob upload rate-limited for {media_label}. "
|
||
f"Skipping remaining retries and omitting optional media."
|
||
)
|
||
return None
|
||
|
||
if attempt < BSKY_BLOB_UPLOAD_MAX_RETRIES:
|
||
logging.warning(
|
||
f"Blob upload rate-limited for {media_label}. "
|
||
f"Retry {attempt}/{BSKY_BLOB_UPLOAD_MAX_RETRIES} after {wait_seconds}s."
|
||
)
|
||
time.sleep(wait_seconds)
|
||
continue
|
||
else:
|
||
logging.warning(f"Exhausted blob upload retries for {media_label}: {repr(e)}")
|
||
break
|
||
|
||
if is_transient_blob_error(e) and transient_attempts < BSKY_BLOB_TRANSIENT_ERROR_RETRIES:
|
||
transient_attempts += 1
|
||
wait_seconds = BSKY_BLOB_TRANSIENT_ERROR_DELAY * transient_attempts
|
||
logging.warning(
|
||
f"Transient blob upload failure for {media_label}: {repr(e)}. "
|
||
f"Retry {transient_attempts}/{BSKY_BLOB_TRANSIENT_ERROR_RETRIES} after {wait_seconds}s."
|
||
)
|
||
time.sleep(wait_seconds)
|
||
continue
|
||
|
||
logging.warning(f"Could not upload {media_label}: {repr(e)}")
|
||
return None
|
||
|
||
logging.warning(f"Could not upload {media_label}: {repr(last_exception)}")
|
||
return None
|
||
|
||
|
||
def compress_external_thumb_to_limit(image_bytes, max_bytes=EXTERNAL_THUMB_MAX_BYTES):
|
||
if not PIL_AVAILABLE:
|
||
return None
|
||
|
||
try:
|
||
with Image.open(io.BytesIO(image_bytes)) as img:
|
||
img = img.convert("RGB")
|
||
|
||
width, height = img.size
|
||
max_dim = max(width, height)
|
||
|
||
if max_dim > EXTERNAL_THUMB_MAX_DIMENSION:
|
||
scale = EXTERNAL_THUMB_MAX_DIMENSION / max_dim
|
||
new_size = (max(1, int(width * scale)), max(1, int(height * scale)))
|
||
img = img.resize(new_size, Image.LANCZOS)
|
||
|
||
for quality in [85, 75, 65, 55, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
|
||
out = io.BytesIO()
|
||
img.save(out, format="JPEG", quality=quality, optimize=True, progressive=True)
|
||
data = out.getvalue()
|
||
if len(data) <= max_bytes:
|
||
return data
|
||
|
||
for target_dim in [1000, 900, 800, 700, 600]:
|
||
resized = img.copy()
|
||
width, height = resized.size
|
||
max_dim = max(width, height)
|
||
|
||
if max_dim > target_dim:
|
||
scale = target_dim / max_dim
|
||
new_size = (max(1, int(width * scale)), max(1, int(height * scale)))
|
||
resized = resized.resize(new_size, Image.LANCZOS)
|
||
|
||
for quality in [60, 50, 45, EXTERNAL_THUMB_MIN_JPEG_QUALITY]:
|
||
out = io.BytesIO()
|
||
resized.save(out, format="JPEG", quality=quality, optimize=True, progressive=True)
|
||
data = out.getvalue()
|
||
if len(data) <= max_bytes:
|
||
return data
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Could not compress external thumbnail: {repr(e)}")
|
||
|
||
return None
|
||
|
||
|
||
def get_external_thumb_blob_from_url(image_url, client, http_client):
|
||
if is_thumb_upload_cooldown_active():
|
||
reset_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(THUMB_UPLOAD_COOLDOWN_UNTIL))
|
||
logging.info(f"Skipping external thumbnail upload due to active cooldown until {reset_str}")
|
||
return None
|
||
|
||
try:
|
||
r = http_client.get(image_url, timeout=HTTP_TIMEOUT, follow_redirects=True)
|
||
if r.status_code != 200:
|
||
logging.warning(f"Could not fetch external thumb {image_url}: HTTP {r.status_code}")
|
||
return None
|
||
|
||
content = r.content
|
||
if not content:
|
||
logging.warning(f"Could not fetch external thumb {image_url}: empty body")
|
||
return None
|
||
|
||
upload_bytes = content
|
||
if len(upload_bytes) > EXTERNAL_THUMB_MAX_BYTES:
|
||
compressed = compress_external_thumb_to_limit(upload_bytes, EXTERNAL_THUMB_MAX_BYTES)
|
||
if compressed:
|
||
upload_bytes = compressed
|
||
else:
|
||
logging.warning("Could not compress external thumb to fit limit. Omitting thumbnail.")
|
||
return None
|
||
|
||
return upload_blob_with_retry(
|
||
client,
|
||
upload_bytes,
|
||
media_label=f"external-thumb:{image_url}",
|
||
optional=True,
|
||
cooldown_on_rate_limit=True
|
||
)
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Could not fetch/upload external thumb {image_url}: {repr(e)}")
|
||
return None
|
||
|
||
|
||
# --- Link metadata ---
|
||
def fetch_link_metadata(url, http_client):
|
||
try:
|
||
r = http_client.get(url, timeout=HTTP_TIMEOUT, follow_redirects=True)
|
||
r.raise_for_status()
|
||
soup = BeautifulSoup(r.text, "html.parser")
|
||
|
||
title = (soup.find("meta", property="og:title") or soup.find("title"))
|
||
desc = (
|
||
soup.find("meta", property="og:description")
|
||
or soup.find("meta", attrs={"name": "description"})
|
||
)
|
||
image = (
|
||
soup.find("meta", property="og:image")
|
||
or soup.find("meta", attrs={"name": "twitter:image"})
|
||
)
|
||
|
||
return {
|
||
"title": title["content"] if title and title.has_attr("content") else (title.text.strip() if title and title.text else ""),
|
||
"description": desc["content"] if desc and desc.has_attr("content") else "",
|
||
"image": image["content"] if image and image.has_attr("content") else None,
|
||
}
|
||
except Exception as e:
|
||
logging.warning(f"Could not fetch link metadata for {url}: {e}")
|
||
return {}
|
||
|
||
|
||
def build_external_link_embed(url, fallback_title, client, http_client):
|
||
link_metadata = fetch_link_metadata(url, http_client)
|
||
|
||
thumb_blob = None
|
||
if link_metadata.get("image"):
|
||
thumb_blob = get_external_thumb_blob_from_url(link_metadata["image"], client, http_client)
|
||
if thumb_blob:
|
||
logging.info("External link card thumbnail prepared successfully")
|
||
else:
|
||
logging.info("External link card will be posted without thumbnail")
|
||
|
||
if link_metadata.get("title") or link_metadata.get("description") or thumb_blob:
|
||
return models.AppBskyEmbedExternal.Main(
|
||
external=models.AppBskyEmbedExternal.External(
|
||
uri=url,
|
||
title=link_metadata.get("title") or fallback_title or "Enllaç",
|
||
description=link_metadata.get("description") or "",
|
||
thumb=thumb_blob,
|
||
)
|
||
)
|
||
|
||
return None
|
||
|
||
|
||
# --- Feed parsing helpers ---
|
||
def parse_entry_time(item):
|
||
candidates = [
|
||
getattr(item, "published", None),
|
||
getattr(item, "updated", None),
|
||
getattr(item, "pubDate", None),
|
||
]
|
||
|
||
for candidate in candidates:
|
||
if candidate:
|
||
try:
|
||
return arrow.get(candidate)
|
||
except Exception:
|
||
continue
|
||
|
||
return None
|
||
|
||
|
||
def build_candidates_from_feed(feed):
|
||
candidates = []
|
||
|
||
for item in getattr(feed, "entries", []):
|
||
try:
|
||
title_text = process_title(getattr(item, "title", "") or "")
|
||
link = canonicalize_url(getattr(item, "link", "") or "")
|
||
published_at = parse_entry_time(item)
|
||
|
||
if not title_text and not link:
|
||
logging.info("Skipping feed item with no usable title and no link.")
|
||
continue
|
||
|
||
normalized_title = normalize_text(title_text)
|
||
entry_fingerprint = build_entry_fingerprint(normalized_title, link)
|
||
|
||
candidates.append({
|
||
"item": item,
|
||
"title_text": title_text,
|
||
"normalized_title": normalized_title,
|
||
"canonical_link": link,
|
||
"published_at": published_at.isoformat() if published_at else None,
|
||
"published_arrow": published_at,
|
||
"entry_fingerprint": entry_fingerprint,
|
||
"post_text_variants": build_post_text_variants(title_text, link),
|
||
})
|
||
|
||
except Exception as e:
|
||
logging.warning(f"Failed to prepare feed entry candidate: {e}")
|
||
|
||
candidates.sort(key=lambda c: c["published_arrow"] or arrow.get(0))
|
||
return candidates
|
||
|
||
|
||
# --- Posting helpers ---
|
||
def is_probable_length_error(exc):
|
||
text = repr(exc)
|
||
signals = [
|
||
"TextTooLong",
|
||
"text too long",
|
||
"Invalid app.bsky.feed.post record",
|
||
"string too long",
|
||
"maxLength",
|
||
"length",
|
||
]
|
||
return any(signal.lower() in text.lower() for signal in signals)
|
||
|
||
|
||
def try_send_post_with_variants(client, text_variants, embed, post_lang):
|
||
last_exception = None
|
||
|
||
for idx, variant in enumerate(text_variants, start=1):
|
||
try:
|
||
logging.info(f"Trying post text variant {idx}/{len(text_variants)} (length={len(variant)})")
|
||
rich_text = make_rich(variant)
|
||
result = client.send_post(text=rich_text, embed=embed, langs=[post_lang])
|
||
return result, variant
|
||
|
||
except Exception as e:
|
||
last_exception = e
|
||
logging.warning(f"Post variant {idx} failed: {repr(e)}")
|
||
|
||
if not is_probable_length_error(e):
|
||
raise
|
||
|
||
if last_exception:
|
||
raise last_exception
|
||
|
||
raise RuntimeError("No text variants available to post.")
|
||
|
||
|
||
# --- Main ---
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Post RSS to Bluesky with JSON state tracking.")
|
||
parser.add_argument("rss_feed", help="RSS feed URL")
|
||
parser.add_argument("bsky_handle", help="Bluesky handle")
|
||
parser.add_argument("bsky_username", help="Bluesky username")
|
||
parser.add_argument("bsky_app_password", help="Bluesky app password")
|
||
parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL")
|
||
parser.add_argument("--lang", default="ca", help="Language code for the post")
|
||
parser.add_argument("--state-path", default=STATE_PATH, help="Path to local JSON state file")
|
||
args = parser.parse_args()
|
||
|
||
feed_url = args.rss_feed
|
||
bsky_handle = args.bsky_handle
|
||
bsky_username = args.bsky_username
|
||
bsky_password = args.bsky_app_password
|
||
service_url = args.service
|
||
post_lang = args.lang
|
||
state_path = args.state_path
|
||
|
||
client = Client(base_url=service_url)
|
||
|
||
backoff = 60
|
||
while True:
|
||
try:
|
||
logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}")
|
||
client.login(bsky_username, bsky_password)
|
||
logging.info(f"Login successful for user: {bsky_username}")
|
||
break
|
||
except Exception:
|
||
logging.exception("Login exception")
|
||
time.sleep(backoff)
|
||
backoff = min(backoff + 60, 600)
|
||
|
||
state = load_state(state_path)
|
||
recent_bsky_posts = get_recent_bsky_posts(client, bsky_handle, limit=DEDUPE_BSKY_LIMIT)
|
||
|
||
logging.info(f"Loaded {len(recent_bsky_posts)} recent Bluesky posts for duplicate detection.")
|
||
logging.info(f"Local state currently tracks {len(state.get('posted_entries', {}))} posted items.")
|
||
|
||
response = httpx.get(feed_url, timeout=HTTP_TIMEOUT, follow_redirects=True)
|
||
response.raise_for_status()
|
||
|
||
try:
|
||
result = charset_normalizer.from_bytes(response.content).best()
|
||
if not result or not hasattr(result, "text"):
|
||
raise ValueError("Could not detect feed encoding.")
|
||
feed_content = result.text
|
||
except ValueError:
|
||
logging.warning("Could not detect feed encoding with charset_normalizer. Trying latin-1.")
|
||
try:
|
||
feed_content = response.content.decode("latin-1")
|
||
except UnicodeDecodeError:
|
||
logging.warning("Could not decode with latin-1. Trying utf-8 with ignored errors.")
|
||
feed_content = response.content.decode("utf-8", errors="ignore")
|
||
|
||
feed = fastfeedparser.parse(feed_content)
|
||
candidates = build_candidates_from_feed(feed)
|
||
|
||
logging.info(f"Prepared {len(candidates)} feed entry candidates for duplicate comparison.")
|
||
|
||
entries_to_post = []
|
||
for candidate in candidates:
|
||
is_dup_state, reason_state = candidate_matches_state(candidate, state)
|
||
if is_dup_state:
|
||
logging.info(f"Skipping candidate due to local state duplicate match on: {reason_state}")
|
||
continue
|
||
|
||
is_dup_bsky, reason_bsky = candidate_matches_existing_bsky(candidate, recent_bsky_posts)
|
||
if is_dup_bsky:
|
||
logging.info(f"Skipping candidate due to recent Bluesky duplicate match on: {reason_bsky}")
|
||
continue
|
||
|
||
entries_to_post.append(candidate)
|
||
|
||
logging.info(f"{len(entries_to_post)} entries remain after duplicate filtering.")
|
||
|
||
if not entries_to_post:
|
||
logging.info("ℹ️ Execution finished: no new entries to publish.")
|
||
return
|
||
|
||
noves_entrades = 0
|
||
|
||
with httpx.Client() as http_client:
|
||
for candidate in entries_to_post:
|
||
title_text = candidate["title_text"]
|
||
canonical_link = candidate["canonical_link"]
|
||
text_variants = candidate["post_text_variants"]
|
||
|
||
logging.info(f"Preparing to post RSS entry: {canonical_link or title_text}")
|
||
|
||
embed = None
|
||
if canonical_link:
|
||
embed = build_external_link_embed(
|
||
canonical_link,
|
||
fallback_title=title_text or "Enllaç",
|
||
client=client,
|
||
http_client=http_client
|
||
)
|
||
|
||
try:
|
||
post_result, posted_text = try_send_post_with_variants(
|
||
client=client,
|
||
text_variants=text_variants,
|
||
embed=embed,
|
||
post_lang=post_lang
|
||
)
|
||
|
||
bsky_uri = getattr(post_result, "uri", None)
|
||
|
||
remember_posted_entry(state, candidate, posted_text=posted_text, bsky_uri=bsky_uri)
|
||
state = prune_state(state, max_entries=5000)
|
||
save_state(state, state_path)
|
||
|
||
recent_bsky_posts.insert(0, {
|
||
"uri": bsky_uri,
|
||
"text": posted_text,
|
||
"normalized_text": normalize_text(posted_text),
|
||
"canonical_non_x_urls": {canonical_link} if canonical_link else set(),
|
||
"created_at": arrow.utcnow().isoformat(),
|
||
})
|
||
recent_bsky_posts = recent_bsky_posts[:DEDUPE_BSKY_LIMIT]
|
||
|
||
noves_entrades += 1
|
||
logging.info(f"Posted RSS entry to Bluesky: {canonical_link or title_text}")
|
||
time.sleep(POST_RETRY_DELAY_SECONDS)
|
||
|
||
except Exception:
|
||
logging.exception(f"Failed to post RSS entry {canonical_link or title_text}")
|
||
|
||
if noves_entrades > 0:
|
||
logging.info(f"🎉 Execution finished: published {noves_entrades} new entries to Bluesky.")
|
||
else:
|
||
logging.info("ℹ️ Execution finished: no new entries were published.")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |