feat: add bsky_post.py and Jenkinsfile for weekly Dijous post

- New standalone script `bsky_post.py` to post text + optional image
  or video to a Bluesky PDS instance. Supports --image, --video (mutually
  exclusive), --alt, --lang, --service, --username, --password flags.
  Reuses the same login-with-backoff pattern as rss2bsky and the Twitter bot.

- New Jenkinsfile that triggers every Thursday at 07:15 (cron: 15 7 * * 4),
  reads credentials from Jenkins secret text entries, and runs bsky_post.py
  with the configured image and Catalan language tag.
This commit is contained in:
Guillem Hernandez Sola
2026-05-07 09:21:47 +02:00
parent 33b4e32738
commit 26027c6e91
6 changed files with 356 additions and 0 deletions

314
bsky_post.py Normal file
View File

@@ -0,0 +1,314 @@
#!/usr/bin/env python3
"""
bsky_post.py — Post text + optional image or video to a Bluesky instance.
Usage examples:
python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4
python3 bsky_post.py "Dijous!!!!" --image thursday.jpg
python3 bsky_post.py "Bon dia!"
python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca
"""
import argparse
import logging
import mimetypes
import os
import sys
import time
import random
import re
from atproto import Client, client_utils, models
# ============================================================
# Logging
# ============================================================
def setup_logging() -> None:
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
# ============================================================
# Text builder
# ============================================================
def make_rich(content: str):
text_builder = client_utils.TextBuilder()
content = content.strip()
lines = content.splitlines()
for line_idx, line in enumerate(lines):
if not line.strip():
if line_idx < len(lines) - 1:
text_builder.text("\n")
continue
words = line.split(" ")
for i, word in enumerate(words):
if not word:
if i < len(words) - 1:
text_builder.text(" ")
continue
if word.startswith("http://") or word.startswith("https://"):
text_builder.link(word, word)
elif word.startswith("#") and len(word) > 1:
tag_name = word[1:].rstrip(".,;:!?)'\"")
if tag_name:
text_builder.tag(word, tag_name)
else:
text_builder.text(word)
else:
text_builder.text(word)
if i < len(words) - 1:
text_builder.text(" ")
if line_idx < len(lines) - 1:
text_builder.text("\n")
return text_builder
# ============================================================
# Error helpers
# ============================================================
def is_rate_limited_error(e) -> bool:
text = repr(e)
return any(s in text for s in ["429", "RateLimitExceeded", "Too Many Requests"])
def is_auth_error(e) -> bool:
text = repr(e).lower()
return any(s in text for s in ["401", "403", "invalid identifier or password",
"authenticationrequired", "invalidtoken"])
def is_network_error(e) -> bool:
text = repr(e)
return any(s in text for s in ["ConnectError", "RemoteProtocolError", "ReadTimeout",
"WriteTimeout", "TimeoutException", "503", "502", "504",
"ConnectionResetError"])
def is_timeout_error(e) -> bool:
text = repr(e)
return any(s in text for s in ["InvokeTimeoutError", "ReadTimeout",
"WriteTimeout", "TimeoutException"])
# ============================================================
# Login with backoff (same pattern as rss2bsky / twitter bot)
# ============================================================
def login_with_backoff(
client: Client,
username: str,
password: str,
service_url: str,
max_attempts: int = 5,
base_delay: float = 2.0,
max_delay: float = 120.0,
jitter: float = 1.5,
) -> bool:
for attempt in range(1, max_attempts + 1):
try:
logging.info(
f"🔐 Login attempt {attempt}/{max_attempts}{service_url} as {username}"
)
client.login(username, password)
logging.info("✅ Login successful.")
return True
except Exception as e:
logging.exception("❌ Login exception")
if is_rate_limited_error(e):
if attempt < max_attempts:
delay = min(base_delay * (2 ** (attempt - 1)), max_delay) + random.uniform(0, jitter)
logging.warning(f"⏳ Rate-limited on login. Retrying in {delay:.1f}s...")
time.sleep(delay)
continue
logging.error("❌ Login rate-limited and retries exhausted.")
return False
if is_auth_error(e):
logging.error("❌ Bad credentials. Check handle/password.")
return False
if attempt < max_attempts and (is_network_error(e) or is_timeout_error(e)):
delay = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(f"⏳ Transient login error. Retrying in {delay:.1f}s...")
time.sleep(delay)
continue
if attempt < max_attempts:
delay = min(base_delay * attempt, max_delay) + random.uniform(0, jitter)
logging.warning(f"⏳ Unknown login error. Retrying in {delay:.1f}s...")
time.sleep(delay)
continue
return False
return False
# ============================================================
# Media upload
# ============================================================
def detect_mime_type(path: str) -> str:
mime, _ = mimetypes.guess_type(path)
if mime:
return mime
ext = os.path.splitext(path)[1].lower()
fallbacks = {
".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".png": "image/png", ".gif": "image/gif",
".webp": "image/webp",
".mp4": "video/mp4", ".mov": "video/quicktime",
".webm": "video/webm",
}
return fallbacks.get(ext, "application/octet-stream")
def upload_image(client: Client, image_path: str, alt_text: str = "") -> models.AppBskyEmbedImages.Image | None:
try:
mime = detect_mime_type(image_path)
with open(image_path, "rb") as f:
data = f.read()
logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})")
response = client.upload_blob(data)
logging.info("✅ Image uploaded successfully.")
return models.AppBskyEmbedImages.Image(
image=response.blob,
alt=alt_text,
)
except Exception as e:
logging.error(f"❌ Failed to upload image: {repr(e)}")
return None
def upload_video(client: Client, video_path: str, alt_text: str = "") -> models.AppBskyEmbedVideo.Main | None:
try:
mime = detect_mime_type(video_path)
with open(video_path, "rb") as f:
data = f.read()
logging.info(f"🎬 Uploading video: {video_path} ({len(data)/1024:.1f} KB, {mime})")
response = client.upload_blob(data)
logging.info("✅ Video blob uploaded successfully.")
return models.AppBskyEmbedVideo.Main(
video=response.blob,
alt=alt_text,
)
except Exception as e:
logging.error(f"❌ Failed to upload video: {repr(e)}")
return None
# ============================================================
# Post
# ============================================================
def post_to_bsky(
client: Client,
text: str,
langs: list[str],
image_path: str | None = None,
video_path: str | None = None,
alt_text: str = "",
) -> bool:
rich_text = make_rich(text)
embed = None
if image_path:
image = upload_image(client, image_path, alt_text=alt_text)
if not image:
logging.error("❌ Aborting post: image upload failed.")
return False
embed = models.AppBskyEmbedImages.Main(images=[image])
elif video_path:
video = upload_video(client, video_path, alt_text=alt_text)
if not video:
logging.error("❌ Aborting post: video upload failed.")
return False
embed = video
try:
logging.info(f"🚀 Sending post (langs={langs}, text={text!r})")
result = client.send_post(text=rich_text, embed=embed, langs=langs)
uri = getattr(result, "uri", None)
logging.info(f"✅ Post published! URI: {uri}")
return True
except Exception as e:
logging.error(f"❌ Failed to send post: {repr(e)}")
return False
# ============================================================
# CLI
# ============================================================
def main():
setup_logging()
parser = argparse.ArgumentParser(
description="Post text + optional image or video to a Bluesky instance."
)
parser.add_argument("text", help="Post text content")
parser.add_argument("--username", required=True, help="Bluesky handle or email")
parser.add_argument("--password", required=True, help="Bluesky app password")
parser.add_argument("--service", default="https://eurosky.social", help="Bluesky PDS URL")
parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)")
parser.add_argument("--image", default=None, help="Path to image file to attach")
parser.add_argument("--video", default=None, help="Path to video file to attach")
parser.add_argument("--alt", default="", help="Alt text for the attached media")
args = parser.parse_args()
if args.image and args.video:
logging.error("❌ Cannot attach both --image and --video at the same time.")
sys.exit(1)
if args.image and not os.path.isfile(args.image):
logging.error(f"❌ Image file not found: {args.image}")
sys.exit(1)
if args.video and not os.path.isfile(args.video):
logging.error(f"❌ Video file not found: {args.video}")
sys.exit(1)
langs = [l.strip() for l in args.lang.split(",") if l.strip()] or ["ca"]
logging.info(f"🌍 Language(s): {langs}")
client = Client(base_url=args.service)
logged_in = login_with_backoff(
client=client,
username=args.username,
password=args.password,
service_url=args.service,
)
if not logged_in:
logging.error("❌ Could not log in. Exiting.")
sys.exit(1)
success = post_to_bsky(
client=client,
text=args.text,
langs=langs,
image_path=args.image,
video_path=args.video,
alt_text=args.alt,
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()