chore: update and merge gitignore

This commit is contained in:
Guillem Hernandez Sola
2026-03-29 17:44:14 +02:00
commit 57bf1aab81
57 changed files with 3124 additions and 0 deletions

227
rss2bsky.py Normal file
View File

@@ -0,0 +1,227 @@
import argparse
import arrow
import fastfeedparser
import logging
import re
import httpx
import time
import charset_normalizer # Per detectar la codificació del feed
import sys # Afegit per enviar els logs a la pantalla
from atproto import Client, client_utils, models
from bs4 import BeautifulSoup
import html # Per desescapar entitats HTML
# --- Logging ---
# Ara envia els registres a la pantalla (stdout) en lloc d'un fitxer
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.INFO, # Nivell DEBUG per veure més detalls durant el test
stream=sys.stdout
)
# --- Funció per corregir problemes de codificació ---
def fix_encoding(text):
try:
# Intenta decodificar i reencodificar a UTF-8
return text.encode("latin-1").decode("utf-8")
except (UnicodeEncodeError, UnicodeDecodeError):
logging.warning(f"Error corregint codificació: {text}")
return text # Retorna el text original si hi ha un error
# --- Funció per desescapar caràcters unicode ---
def desescapar_unicode(text):
try:
return html.unescape(text) # Utilitza html.unescape per gestionar HTML entities
except Exception as e:
logging.warning(f"Error desescapant unicode: {e}")
return text # Retorna el text original si hi ha un error
# --- Funció per processar el títol ---
def process_title(title):
try:
if is_html(title):
title_text = BeautifulSoup(title, "html.parser", from_encoding="utf-8").get_text().strip()
else:
title_text = title.strip()
title_text = desescapar_unicode(title_text) # Desescapar HTML entities
title_text = fix_encoding(title_text) # Corregir problemes de codificació
return title_text
except Exception as e:
logging.warning(f"Error processant el títol: {e}")
return title
def fetch_link_metadata(url):
try:
r = httpx.get(url, timeout=10)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
title = (soup.find("meta", property="og:title") or soup.find("title"))
desc = (soup.find("meta", property="og:description") or soup.find("meta", attrs={"name": "description"}))
image = (soup.find("meta", property="og:image") or soup.find("meta", attrs={"name": "twitter:image"}))
return {
"title": title["content"] if title and title.has_attr("content") else (title.text if title else ""),
"description": desc["content"] if desc and desc.has_attr("content") else "",
"image": image["content"] if image and image.has_attr("content") else None,
}
except Exception as e:
logging.warning(f"Could not fetch link metadata for {url}: {e}")
return {}
def get_last_bsky(client, handle):
timeline = client.get_author_feed(handle)
for titem in timeline.feed:
# Only care about top-level, non-reply posts
if titem.reason is None and getattr(titem.post.record, "reply", None) is None:
logging.info("Record created %s", str(titem.post.record.created_at))
return arrow.get(titem.post.record.created_at)
return arrow.get(0)
def make_rich(content):
text_builder = client_utils.TextBuilder()
lines = content.split("\n")
for line in lines:
# If the line is a URL, make it a clickable link
if line.startswith("http"):
url = line.strip()
text_builder.link(url, url)
else:
tag_split = re.split("(#[a-zA-Z0-9]+)", line)
for i, t in enumerate(tag_split):
if i == len(tag_split) - 1:
t = t + "\n"
if t.startswith("#"):
text_builder.tag(t, t[1:].strip())
else:
text_builder.text(t)
return text_builder
# --- Nova funció: Només retorna el 'blob' necessari per a la miniatura de l'enllaç ---
def get_blob_from_url(image_url, client):
try:
r = httpx.get(image_url, timeout=10)
if r.status_code != 200:
return None
img_blob = client.upload_blob(r.content)
return img_blob.blob
except Exception as e:
logging.warning(f"Could not fetch/upload image from {image_url}: {e}")
return None
def is_html(text):
return bool(re.search(r'<.*?>', text))
def main():
# --- Parse command-line arguments ---
parser = argparse.ArgumentParser(description="Post RSS to Bluesky.")
parser.add_argument("rss_feed", help="RSS feed URL")
parser.add_argument("bsky_handle", help="Bluesky handle")
parser.add_argument("bsky_username", help="Bluesky username")
parser.add_argument("bsky_app_password", help="Bluesky app password")
parser.add_argument("--service", default="https://bsky.social", help="Bluesky server URL (default: https://bsky.social)")
# Nova opció per a l'idioma, per defecte en català ('ca')
parser.add_argument("--lang", default="ca", help="Language code for the post (default: ca)")
args = parser.parse_args()
feed_url = args.rss_feed
bsky_handle = args.bsky_handle
bsky_username = args.bsky_username
bsky_password = args.bsky_app_password
service_url = args.service
post_lang = args.lang
# --- Login ---
client = Client(base_url=service_url) # Inicialitzem directament amb el servidor personalitzat
backoff = 60
while True:
try:
logging.info(f"Attempting login to server: {service_url} with user: {bsky_username}")
client.login(bsky_username, bsky_password)
logging.info(f"Login successful for user: {bsky_username}")
break
except Exception as e:
logging.exception("Login exception")
time.sleep(backoff)
backoff = min(backoff + 60, 600)
# --- Get last Bluesky post time ---
last_bsky = get_last_bsky(client, bsky_handle)
# --- Parse feed ---
response = httpx.get(feed_url)
response.raise_for_status() # Comprova que la resposta sigui correcta
try:
# Detecta automàticament la codificació i converteix a UTF-8
result = charset_normalizer.from_bytes(response.content).best()
if not result or not hasattr(result, "text"):
raise ValueError("No s'ha pogut detectar la codificació del feed o el text no és accessible.")
feed_content = result.text # Contingut decodificat com UTF-8
except ValueError:
logging.warning("No s'ha pogut detectar la codificació amb charset_normalizer. Provant amb latin-1.")
try:
feed_content = response.content.decode("latin-1")
except UnicodeDecodeError:
logging.warning("No s'ha pogut decodificar amb latin-1. Provant amb utf-8 amb errors ignorats.")
feed_content = response.content.decode("utf-8", errors="ignore")
feed = fastfeedparser.parse(feed_content) # Passa el contingut decodificat al parser
# --- Inicialitzem el comptador d'entrades publicades ---
noves_entrades = 0
for item in feed.entries:
rss_time = arrow.get(item.published)
logging.info("RSS Time: %s", str(rss_time))
# Processar el títol per evitar problemes de codificació
title_text = process_title(item.title)
post_text = f"{title_text}\n{item.link}"
logging.info("Title+link used as content: %s", post_text)
rich_text = make_rich(post_text)
logging.info("Rich text length: %d" % (len(rich_text.build_text())))
logging.info("Filtered Content length: %d" % (len(post_text)))
# Si el RSS és més nou que l'últim post, publica
if rss_time > last_bsky:
link_metadata = fetch_link_metadata(item.link)
# --- 1. Obtenim el blob de la imatge per a la miniatura ---
thumb_blob = None
if link_metadata.get("image"):
thumb_blob = get_blob_from_url(link_metadata["image"], client)
# --- 2. Creem l'embed extern (targeta d'enllaç) i hi assignem la miniatura ---
embed = None
if link_metadata.get("title") or link_metadata.get("description") or thumb_blob:
embed = models.AppBskyEmbedExternal.Main(
external=models.AppBskyEmbedExternal.External(
uri=item.link,
title=link_metadata.get("title") or title_text or "Enllaç",
description=link_metadata.get("description") or "",
thumb=thumb_blob, # Aquí carreguem la imatge a la targeta
)
)
try:
logging.info("Test mode: Preparing to send post %s" % (item.link))
# Afegim langs=[post_lang] per especificar l'idioma
client.send_post(rich_text, embed=embed, langs=[post_lang])
logging.info("Test mode: Post prepared %s" % (item.link))
# Incrementem el comptador d'èxits
noves_entrades += 1
except Exception as e:
logging.exception("Failed to prepare post %s" % (item.link))
else:
logging.debug("Not sending %s" % (item.link))
# --- Resum final de l'execució ---
if noves_entrades > 0:
logging.info(f"🎉 Execució finalitzada: S'han publicat {noves_entrades} noves entrades a Bluesky.")
else:
logging.info(" Execució finalitzada: No hi havia cap entrada nova per publicar.")
if __name__ == "__main__":
main()