#!/usr/bin/env python3 """ bsky_post.py — Post text + optional image or video to a Bluesky instance. Usage examples: python3 bsky_post.py "DIVENDRES!!!!" --video friday.mp4 python3 bsky_post.py "Dijous!!!!" --image thursday.jpg python3 bsky_post.py "Bon dia!" python3 bsky_post.py "Bon dia!" --image photo.jpg --lang ca """ import argparse import logging import mimetypes import os import sys import time import random import re from dataclasses import dataclass from atproto import Client, client_utils, models # ============================================================ # Config # ============================================================ @dataclass(frozen=True) class RetryConfig: login_max_attempts: int = 5 login_base_delay_seconds: float = 10.0 login_max_delay_seconds: float = 600.0 login_jitter_seconds: float = 3 # ============================================================ # Logging # ============================================================ def setup_logging() -> None: logging.basicConfig( format="%(asctime)s %(message)s", level=logging.INFO, stream=sys.stdout, ) # ============================================================ # Text builder # ============================================================ def make_rich(content: str): text_builder = client_utils.TextBuilder() content = content.strip() lines = content.splitlines() for line_idx, line in enumerate(lines): if not line.strip(): if line_idx < len(lines) - 1: text_builder.text("\n") continue words = line.split(" ") for i, word in enumerate(words): if not word: if i < len(words) - 1: text_builder.text(" ") continue if word.startswith("http://") or word.startswith("https://"): text_builder.link(word, word) elif word.startswith("#") and len(word) > 1: tag_name = word[1:].rstrip(".,;:!?)'\"") if tag_name: text_builder.tag(word, tag_name) else: text_builder.text(word) else: text_builder.text(word) if i < len(words) - 1: text_builder.text(" ") if line_idx < len(lines) - 1: text_builder.text("\n") return text_builder # ============================================================ # Error helpers # ============================================================ def is_rate_limited_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "429" in text or "ratelimitexceeded" in text or "too many requests" in text or "rate limit" in text ) def is_auth_error(error_obj) -> bool: text = repr(error_obj).lower() return ( "401" in text or "403" in text or "invalid identifier or password" in text or "authenticationrequired" in text or "invalidtoken" in text ) def is_network_error(error_obj) -> bool: text = repr(error_obj) signals = [ "ConnectError", "RemoteProtocolError", "ReadTimeout", "WriteTimeout", "TimeoutException", "503", "502", "504", "ConnectionResetError", ] return any(sig in text for sig in signals) def is_transient_error(error_obj) -> bool: error_text = repr(error_obj) transient_signals = [ "InvokeTimeoutError", "ReadTimeout", "WriteTimeout", "TimeoutException", "RemoteProtocolError", "ConnectError", "503", "502", "504", ] return any(signal in error_text for signal in transient_signals) def get_rate_limit_wait_seconds(error_obj, default_delay: float, max_delay: float) -> float: """ Parse common rate-limit headers and return a bounded wait time in seconds. Supports: - retry-after - x-ratelimit-after - ratelimit-reset (unix timestamp) """ try: now_ts = int(time.time()) # Direct headers on exception headers = getattr(error_obj, "headers", None) or {} retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass try: # Nested response headers response = getattr(error_obj, "response", None) headers = getattr(response, "headers", None) or {} now_ts = int(time.time()) retry_after = headers.get("retry-after") or headers.get("Retry-After") if retry_after: return min(max(float(retry_after), 1.0), max_delay) x_after = headers.get("x-ratelimit-after") or headers.get("X-RateLimit-After") if x_after: return min(max(float(x_after), 1.0), max_delay) reset_value = headers.get("ratelimit-reset") or headers.get("RateLimit-Reset") if reset_value: wait_seconds = max(float(reset_value) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) except Exception: pass # repr fallback parsing text = repr(error_obj) m = re.search(r"'retry-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'x-ratelimit-after': '(\d+)'", text, re.IGNORECASE) if m: return min(max(float(m.group(1)), 1.0), max_delay) m = re.search(r"'ratelimit-reset': '(\d+)'", text, re.IGNORECASE) if m: now_ts = int(time.time()) wait_seconds = max(float(m.group(1)) - now_ts + 1.0, default_delay) return min(wait_seconds, max_delay) return default_delay # ============================================================ # Login with backoff # ============================================================ def login_with_backoff( client: Client, username: str, password: str, service_url: str, max_attempts: int = 5, base_delay: float = 10.0, max_delay: float = 600.0, jitter: float = 1.5, ) -> bool: for attempt in range(1, max_attempts + 1): try: logging.info( f"🔑 Login attempt {attempt}/{max_attempts} → {service_url} as {username}" ) client.login(username, password) logging.info("✅ Login successful.") return True except Exception as e: logging.exception("❌ Login exception") # Fail fast on invalid credentials if is_auth_error(e): logging.error("❌ Bad credentials. Check handle/password.") return False # Respect explicit rate-limit timing if is_rate_limited_error(e): if attempt < max_attempts: wait = get_rate_limit_wait_seconds(e, default_delay=base_delay, max_delay=max_delay) wait = wait + random.uniform(0, jitter) logging.warning( f"⏳ Rate-limited on login (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries due to rate limiting.") return False # Retry transient/network problems if is_network_error(e) or is_transient_error(e): if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Transient login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue logging.error("❌ Exhausted login retries after transient/network errors.") return False # Unknown errors: bounded retry anyway if attempt < max_attempts: wait = min(base_delay * attempt, max_delay) + random.uniform(0, jitter) logging.warning( f"⏳ Unknown login error (attempt {attempt}/{max_attempts}). " f"Retrying in {wait:.1f}s..." ) time.sleep(wait) continue return False # ============================================================ # Media upload # ============================================================ def detect_mime_type(path: str) -> str: mime, _ = mimetypes.guess_type(path) if mime: return mime ext = os.path.splitext(path)[1].lower() fallbacks = { ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp", ".mp4": "video/mp4", ".mov": "video/quicktime", ".webm": "video/webm", } return fallbacks.get(ext, "application/octet-stream") def upload_image( client: Client, image_path: str, alt_text: str = "", ) -> models.AppBskyEmbedImages.Image | None: try: mime = detect_mime_type(image_path) with open(image_path, "rb") as f: data = f.read() logging.info(f"🖼️ Uploading image: {image_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("✅ Image uploaded successfully.") return models.AppBskyEmbedImages.Image( image=response.blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ Failed to upload image: {repr(e)}") return None def upload_video( client: Client, video_path: str, alt_text: str = "", ) -> models.AppBskyEmbedVideo.Main | None: try: mime = detect_mime_type(video_path) with open(video_path, "rb") as f: data = f.read() logging.info(f"🎬 Uploading video: {video_path} ({len(data)/1024:.1f} KB, {mime})") response = client.upload_blob(data) logging.info("✅ Video blob uploaded successfully.") return models.AppBskyEmbedVideo.Main( video=response.blob, alt=alt_text, ) except Exception as e: logging.error(f"❌ Failed to upload video: {repr(e)}") return None # ============================================================ # Post # ============================================================ def post_to_bsky( client: Client, text: str, langs: list[str], image_path: str | None = None, video_path: str | None = None, alt_text: str = "", ) -> bool: rich_text = make_rich(text) embed = None if image_path: image = upload_image(client, image_path, alt_text=alt_text) if not image: logging.error("❌ Aborting post: image upload failed.") return False embed = models.AppBskyEmbedImages.Main(images=[image]) elif video_path: video = upload_video(client, video_path, alt_text=alt_text) if not video: logging.error("❌ Aborting post: video upload failed.") return False embed = video try: logging.info(f"🚀 Sending post (langs={langs}, text={text!r})") result = client.send_post(text=rich_text, embed=embed, langs=langs) uri = getattr(result, "uri", None) logging.info(f"✅ Post published! URI: {uri}") return True except Exception as e: logging.error(f"❌ Failed to send post: {repr(e)}") return False # ============================================================ # CLI # ============================================================ def main(): setup_logging() parser = argparse.ArgumentParser( description="Post text + optional image or video to a Bluesky instance." ) parser.add_argument("text", help="Post text content") parser.add_argument("--username", required=True, help="Bluesky handle or email") parser.add_argument("--password", required=True, help="Bluesky app password") parser.add_argument("--service", default="https://bsky.social", help="Bluesky PDS URL") parser.add_argument("--lang", default="ca", help="Comma-separated language codes (e.g. ca,es)") parser.add_argument("--image", default=None, help="Path to image file to attach") parser.add_argument("--video", default=None, help="Path to video file to attach") parser.add_argument("--alt", default="", help="Alt text for media") args = parser.parse_args() client = Client(base_url=args.service) success = login_with_backoff( client, args.username, args.password, args.service, max_attempts=RetryConfig.login_max_attempts, base_delay=RetryConfig.login_base_delay_seconds, max_delay=RetryConfig.login_max_delay_seconds, jitter=RetryConfig.login_jitter_seconds, ) if not success: sys.exit(1) langs = [l.strip() for l in args.lang.split(",") if l.strip()] post_success = post_to_bsky( client, text=args.text, langs=langs, image_path=args.image, video_path=args.video, alt_text=args.alt, ) if not post_success: sys.exit(1) if __name__ == "__main__": main()